KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > freecs > core > RequestReader


1 /**
2  * Copyright (C) 2003 Manfred Andres
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17  */

18 package freecs.core;
19
20 import freecs.*;
21 import freecs.interfaces.*;
22
23 import java.nio.*;
24 import java.nio.channels.*;
25 import java.util.*;
26
27
28 public class RequestReader extends Thread JavaDoc {
29     public static final short WAITING = 0,
30                             EVAL_GET_MESSAGES_APND2WRITE = 1,
31                             EVAL_GET_MESSAGES_SND_MSGS=2,
32                             EVAL_GET_MESSAGES=3,
33                             EVAL_GET_STATE=4,
34                             EVAL_GET=5,
35                             EVAL_POST=6,
36                             EVAL_POST_LOGIN=7,
37                             EVAL_PREP4SEND=8,
38                             EVAL_SEND=9,
39                             EVAL_SENDFINAL=10,
40                             EVALUATE_COMMAND=11,
41                             EVALUATING=12,
42                             PARSE_MSG=13,
43                             READING=14,
44                             EVAL_POST_LOGIN_RESULT=15,
45                             TRYLOGIN=16,
46                             TRYLOGIN_AUTHENTICATE=17,
47                             TRYLOGIN_CHECK_FRIENDS=18,
48                             TRYLOGIN_CHECK4PRESENCE=19,
49                             TRYLOGIN_CORRECT_PERMISSION=20,
50                             TRYLOGIN_SCHEDULE_FRIENDMSGS=21,
51                             TRYLOGIN_SCHEDULE_VIPMSG=22,
52                             TRYLOGIN_SEND_LOGINMSG=23,
53                             TRYLOGIN_SET_GROUP=24,
54                             TRYLOGIN_SET_PERMISSION=25;
55     
56     
57     private static Vector reqReaders = new Vector ();
58     private static short readerID = 0;
59     
60     private long shutdowntime;
61     private short ID;
62     private ByteBuffer buf;
63     private RequestEvaluator evaluator;
64     public RequestQueue reqQueue;
65     public boolean isFixed, working;
66     public long workstart;
67     
68     public volatile IRequest currentRequest=null;
69     
70     public short currPosition;
71     public String JavaDoc currCommand;
72     
73     private RequestReader(short id) {
74         this.ID = id;
75         reqQueue = new RequestQueue (this);
76         if (Server.TRACE_CREATE_AND_FINALIZE)
77             Server.log (this, "++++++++++++++++++++++++++++++++++++++++CREATE", Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
78     }
79     
80     public static boolean processRequest (SelectionKey sk) {
81         if (!CentralSelector.isSkValid(sk)) {
82             Server.log("static RequestReader", "processRequest: current request has invalid key", Server.MSG_STATE, Server.LVL_VERBOSE);
83             return true;
84         }
85         // this is the work-to-thread-algorithm
86
// threads at the beginning have to get more requests, than threads at the
87
// end of the thread-list, to enable the threads at the end of the thread-list
88
// to quit working on lower usage. This is done by considering the queue usage and
89
// the list-index.
90
// factor = RequestQueue.size() + idx*(READER_MAX_QUEUE / MAX_READERS)
91
float min = Server.srv.READER_MAX_QUEUE;
92         float incr = ((float) Server.srv.READER_MAX_QUEUE) / Server.srv.MAX_READERS;
93         int rrSizeBorder = (int) (reqReaders.size()/1.5);
94         RequestReader minReader = null;
95         for (int i = 0; i < reqReaders.size(); i++) {
96             RequestReader r = (RequestReader) reqReaders.elementAt(i);
97             int rqSize = r.reqQueue.size();
98             if (i < rrSizeBorder && rqSize==0) {
99                 minReader=r;
100                 break;
101             }
102             float factor = ((float) rqSize) + i * incr;
103             if (factor < min && !r.isSuspending()) {
104                 min = factor;
105                 minReader=r;
106             }
107         }
108         if (minReader == null) {
109             minReader = RequestReader.startRequestReader(false);
110         }
111         if (minReader == null) {
112             // if no minReader may be started and every reader's factor is too high
113
// we will have to loop over all threads and get the lowest requestqueue-size
114
// to deliver this request
115
int lowestQueue = Server.srv.READER_MAX_QUEUE+1;
116             for (int i = 0; i< reqReaders.size(); i++) {
117                 RequestReader r = (RequestReader) reqReaders.elementAt(i);
118                 int factor = r.reqQueue.size();
119                 if (r.working)
120                     factor++;
121                 if (factor < lowestQueue)
122                     minReader = r;
123             }
124             if (minReader==null)
125                 return false;
126         }
127         minReader.reqQueue.addKey(sk);
128         return true;
129     }
130     
131     private void restart() {
132         Server.log (this, "trying to restart dead thread", Server.MSG_STATE, Server.LVL_MAJOR);
133         this.start();
134     }
135
136     public static boolean[] getAliveState () {
137         boolean[] res = new boolean[reqReaders.size()];
138         for (int i = 0; i<res.length; i++) {
139             RequestReader r = (RequestReader) reqReaders.elementAt(i);
140             res[i] = r.isAlive();
141             if (!res[i])
142                 r.restart();
143         }
144         return res;
145     }
146
147     public static long[][] getWorkingSince () {
148         long[][] res = new long[reqReaders.size()][2];
149         for (int i = 0; i<res.length; i++) {
150             RequestReader r = (RequestReader) reqReaders.elementAt(i);
151             if (r.working)
152                 res[i][0] = r.workstart;
153             else
154                 res[i][0] = 0;
155             res[i][1]=r.currPosition;
156         }
157         return res;
158     }
159     
160     public static String JavaDoc getCurrCommant (int idx) {
161         return ((RequestReader) reqReaders.elementAt(idx)).currCommand;
162     }
163     
164     public static double[] getOveralUsage () {
165         double[] res = new double[reqReaders.size()];
166         for (int i = 0; i < res.length; i++) {
167             RequestReader r = (RequestReader) reqReaders.elementAt(i);
168             res[i] = r.reqQueue.getUsage();
169         }
170         return res;
171     }
172
173     /**
174      * starts a new requestreader-thread and possibly makes it as
175      * a fixed thread. A fixed thread will only suspend if the server
176      * shuts down. If the maximum number of configured RequestReader-threads
177      * is reached, null will be returned.
178      * @param fixed markes it as fixed thread if true
179      * @return the RequestReader
180      */

181     public static RequestReader startRequestReader (boolean fixed) {
182         if (activeReaders () >= Server.srv.MAX_READERS)
183             return null;
184         short cid = readerID++;
185         RequestReader reqReader = new RequestReader (cid);
186         reqReader.isFixed = fixed;
187         if (readerID == Short.MAX_VALUE)
188             readerID = Short.MIN_VALUE;
189         reqReaders.add (reqReader);
190         if (fixed) {
191             reqReader.setName ("FIXED-RequestReader " + cid);
192             // reqReader.setPriority (Thread.MAX_PRIORITY-1);
193
StringBuffer JavaDoc tsb = new StringBuffer JavaDoc ("Thread START: (FIXED THREAD, ");
194             tsb.append (reqReaders.size ());
195             tsb.append (" threads running)");
196             Server.log ("static RequestReader", tsb.toString (), Server.MSG_STATE, Server.LVL_MAJOR);
197         } else {
198             reqReader.setName ("RequestReader " + cid);
199             // reqReader.setPriority (Thread.MAX_PRIORITY-1);
200
StringBuffer JavaDoc tsb = new StringBuffer JavaDoc ("Thread START: (").append (reqReaders.size ()).append (" threads running)");
201             Server.log ("static RequestReader", tsb.toString (), Server.MSG_STATE, Server.LVL_MINOR);
202         }
203         reqReader.start ();
204         return reqReader;
205     }
206
207     /**
208      * removes a requestreader from the requestreader-list
209      * @param reqReader the requestreader to remove
210      */

211    public static void removeRequestReader (RequestReader reqReader) {
212       reqReaders.remove (reqReader);
213       StringBuffer JavaDoc tsb= new StringBuffer JavaDoc ("Thread STOP: (").append (reqReaders.size ()).append (" threads running)");
214       Server.log ("static RequestReader", tsb.toString (), Server.MSG_STATE, Server.LVL_MINOR);
215    }
216
217     /**
218      * returns the number of requestreaders in the requestreader-list
219      * @return number of requestreaders
220      */

221    public static int activeReaders () {
222       return reqReaders.size ();
223    }
224
225     /**
226      * the work of a requestreader is done her
227      * .) check if the request-queue has something to read
228      * .) if there is nothing and the time between the last read
229      * and now is higher than Server.READER_MAX_IDLETIME and the
230      * thread is not marked as fixed, the RequestReader will suspend
231      * .) if there is something to read it will be pricessed
232      * .) if the processed request is complete, it will be evaluated
233      */

234     public void run() {
235         buf = ByteBuffer.allocate(Server.srv.READBUFFER_SIZE);
236         evaluator = new RequestEvaluator (this);
237         long lastReadTime = System.currentTimeMillis ();
238         shutdowntime = 0;
239         long lastMessage = 0;
240         boolean suspend = false;
241         while (!suspend) try {
242             if (Server.DEBUG || lastMessage + 5000 > System.currentTimeMillis()) {
243                 Server.log (this, "loopstart", Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
244                 lastMessage = System.currentTimeMillis();
245             }
246             currPosition=WAITING;
247             if (!Server.srv.isRunning ()) {
248                 if (shutdowntime == 0)
249                     shutdowntime = System.currentTimeMillis () + 150000;
250                 if (shutdowntime < System.currentTimeMillis ()) {
251                     suspend = true;
252                     break;
253                 }
254             }
255             long diff = Server.srv.READER_MAX_IDLETIME;
256             if (!this.isFixed) {
257                 // if this reader was idle too long, make this thread disapear
258
diff = System.currentTimeMillis () - lastReadTime;
259                 if (diff > Server.srv.READER_MAX_IDLETIME
260                     && activeReaders () > 1
261                     && reqQueue.size() < 1)
262                     break;
263                 if (diff > Server.srv.READER_MAX_IDLETIME)
264                     diff = Server.srv.READER_MAX_IDLETIME;
265             }
266             SelectionKey sk;
267             if (Server.srv.USE_CENTRAL_REQUESTQUEUE) {
268                 synchronized (CentralSelector.cSel.reqQueue) {
269                     if (CentralSelector.cSel.reqQueue.size() < 1) try {
270                         CentralSelector.cSel.reqQueue.wait(Server.srv.READER_MAX_IDLETIME - diff);
271                     } catch (InterruptedException JavaDoc ie) { }
272                     sk = (SelectionKey) CentralSelector.cSel.reqQueue.pop();
273                     CentralSelector.cSel.reqQueue.notify();
274                 }
275             } else {
276                 sk = reqQueue.popKey(diff); // reqQueue.getKey (diff);
277
}
278             if (sk == null) {
279                 try {
280                    Thread.sleep (33);
281                 } catch (InterruptedException JavaDoc ie) { }
282                 continue;
283             }
284             currPosition=READING;
285             working = true;
286             workstart = lastReadTime = System.currentTimeMillis ();
287             long start = System.currentTimeMillis();
288             StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
289             try {
290                 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
291 // synchronized (cb) {
292
/* if (Server.srv.CENTRALSELECTOR_PARSES_REQUEST) { */
293 // FIXME: evaluate() and everything below should check for interrup-requests by
294
// the RequestMonitor and throw an interrupted-exception when it is encountered
295
evaluate(sk, cb);
296                         sb.append ("evaluate: took ");
297 /* } else {
298                         read(sk, cb);
299                         sb.append ("read: ended. took me ");
300                     // } */

301 // }
302
} catch (Exception JavaDoc e) {
303                 Server.debug (this, "catched Exception while reading/evaluating", e, Server.MSG_ERROR, Server.LVL_MAJOR);
304                 try {
305                     Thread.sleep (33);
306                 } catch (InterruptedException JavaDoc ie) { }
307                 continue;
308             } finally {
309                 RequestMonitor.instance.removeMonitor(this);
310                 working = false;
311             }
312             long proctime = System.currentTimeMillis () - start;
313             if (Server.checkLogLvl (Server.MSG_STATE, Server.LVL_VERY_VERBOSE)) {
314                 sb.append (proctime);
315                 sb.append (" millis ");
316                 if (currentRequest == null) {
317                     sb.append ("reading");
318                 } else {
319                     sb.append ("reading and processing ");
320                     sb.append (currentRequest.toString());
321                 }
322                 Server.log (this, sb.toString(), Server.MSG_STATE, Server.LVL_VERBOSE);
323             }
324         } catch (Exception JavaDoc e) {
325             Server.debug (this, "(outer loop): ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
326         }
327         removeRequestReader (this);
328     }
329     
330     private void evaluate(SelectionKey sk, ConnectionBuffer cb) {
331         if (cb == null) {
332             Server.log(this, "ConnectionBuffer was null for Selectionkey", Server.MSG_ERROR, Server.LVL_MAJOR);
333             return;
334         }
335         if (!cb.isValid()) {
336             CentralSelector.dropKey(sk);
337             return;
338         }
339         if (cb.currentRequest != null) {
340             currentRequest = cb.currentRequest;
341             cb.currentRequest = null;
342             evaluator.evaluate (currentRequest);
343         }
344     }
345
346     public boolean isSuspending () {
347         return (shutdowntime != 0 && !isFixed);
348     }
349     public int hashCode () { return (int) ID; }
350     public boolean equals (RequestReader r) { return r.getID () == ID; }
351     public short getID () { return ID; }
352    
353     private volatile String JavaDoc strgVal=null;
354     public String JavaDoc toString () {
355         if (strgVal == null) {
356             StringBuffer JavaDoc sb = new StringBuffer JavaDoc("[RequestReader ");
357             if (ID < 10) {
358                 sb.append (" ");
359             } else if (ID < 10) {
360                 sb.append (" ");
361             } else if (ID < 100) {
362                 sb.append (" ");
363             } else if (ID < 1000) {
364                 sb.append (" ");
365             }
366             sb.append (ID);
367             sb.append ("]");
368             strgVal = sb.toString();
369         }
370         return (strgVal);
371     }
372
373     public void finalize() {
374         if (Server.TRACE_CREATE_AND_FINALIZE)
375             Server.log(this, "----------------------------------------FINALIZED", Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
376     }
377 }
Popular Tags