1 18 package freecs.core; 19 20 import freecs.*; 21 import freecs.interfaces.*; 22 23 import java.nio.*; 24 import java.nio.channels.*; 25 import java.util.*; 26 27 28 public class RequestReader extends Thread { 29 public static final short WAITING = 0, 30 EVAL_GET_MESSAGES_APND2WRITE = 1, 31 EVAL_GET_MESSAGES_SND_MSGS=2, 32 EVAL_GET_MESSAGES=3, 33 EVAL_GET_STATE=4, 34 EVAL_GET=5, 35 EVAL_POST=6, 36 EVAL_POST_LOGIN=7, 37 EVAL_PREP4SEND=8, 38 EVAL_SEND=9, 39 EVAL_SENDFINAL=10, 40 EVALUATE_COMMAND=11, 41 EVALUATING=12, 42 PARSE_MSG=13, 43 READING=14, 44 EVAL_POST_LOGIN_RESULT=15, 45 TRYLOGIN=16, 46 TRYLOGIN_AUTHENTICATE=17, 47 TRYLOGIN_CHECK_FRIENDS=18, 48 TRYLOGIN_CHECK4PRESENCE=19, 49 TRYLOGIN_CORRECT_PERMISSION=20, 50 TRYLOGIN_SCHEDULE_FRIENDMSGS=21, 51 TRYLOGIN_SCHEDULE_VIPMSG=22, 52 TRYLOGIN_SEND_LOGINMSG=23, 53 TRYLOGIN_SET_GROUP=24, 54 TRYLOGIN_SET_PERMISSION=25; 55 56 57 private static Vector reqReaders = new Vector (); 58 private static short readerID = 0; 59 60 private long shutdowntime; 61 private short ID; 62 private ByteBuffer buf; 63 private RequestEvaluator evaluator; 64 public RequestQueue reqQueue; 65 public boolean isFixed, working; 66 public long workstart; 67 68 public volatile IRequest currentRequest=null; 69 70 public short currPosition; 71 public String currCommand; 72 73 private RequestReader(short id) { 74 this.ID = id; 75 reqQueue = new RequestQueue (this); 76 if (Server.TRACE_CREATE_AND_FINALIZE) 77 Server.log (this, "++++++++++++++++++++++++++++++++++++++++CREATE", Server.MSG_STATE, Server.LVL_VERY_VERBOSE); 78 } 79 80 public static boolean processRequest (SelectionKey sk) { 81 if (!CentralSelector.isSkValid(sk)) { 82 Server.log("static RequestReader", "processRequest: current request has invalid key", Server.MSG_STATE, Server.LVL_VERBOSE); 83 return true; 84 } 85 float min = Server.srv.READER_MAX_QUEUE; 92 float incr = ((float) Server.srv.READER_MAX_QUEUE) / Server.srv.MAX_READERS; 93 int rrSizeBorder = (int) (reqReaders.size()/1.5); 94 RequestReader minReader = null; 95 for (int i = 0; i < reqReaders.size(); i++) { 96 RequestReader r = (RequestReader) reqReaders.elementAt(i); 97 int rqSize = r.reqQueue.size(); 98 if (i < rrSizeBorder && rqSize==0) { 99 minReader=r; 100 break; 101 } 102 float factor = ((float) rqSize) + i * incr; 103 if (factor < min && !r.isSuspending()) { 104 min = factor; 105 minReader=r; 106 } 107 } 108 if (minReader == null) { 109 minReader = RequestReader.startRequestReader(false); 110 } 111 if (minReader == null) { 112 int lowestQueue = Server.srv.READER_MAX_QUEUE+1; 116 for (int i = 0; i< reqReaders.size(); i++) { 117 RequestReader r = (RequestReader) reqReaders.elementAt(i); 118 int factor = r.reqQueue.size(); 119 if (r.working) 120 factor++; 121 if (factor < lowestQueue) 122 minReader = r; 123 } 124 if (minReader==null) 125 return false; 126 } 127 minReader.reqQueue.addKey(sk); 128 return true; 129 } 130 131 private void restart() { 132 Server.log (this, "trying to restart dead thread", Server.MSG_STATE, Server.LVL_MAJOR); 133 this.start(); 134 } 135 136 public static boolean[] getAliveState () { 137 boolean[] res = new boolean[reqReaders.size()]; 138 for (int i = 0; i<res.length; i++) { 139 RequestReader r = (RequestReader) reqReaders.elementAt(i); 140 res[i] = r.isAlive(); 141 if (!res[i]) 142 r.restart(); 143 } 144 return res; 145 } 146 147 public static long[][] getWorkingSince () { 148 long[][] res = new long[reqReaders.size()][2]; 149 for (int i = 0; i<res.length; i++) { 150 RequestReader r = (RequestReader) reqReaders.elementAt(i); 151 if (r.working) 152 res[i][0] = r.workstart; 153 else 154 res[i][0] = 0; 155 res[i][1]=r.currPosition; 156 } 157 return res; 158 } 159 160 public static String getCurrCommant (int idx) { 161 return ((RequestReader) reqReaders.elementAt(idx)).currCommand; 162 } 163 164 public static double[] getOveralUsage () { 165 double[] res = new double[reqReaders.size()]; 166 for (int i = 0; i < res.length; i++) { 167 RequestReader r = (RequestReader) reqReaders.elementAt(i); 168 res[i] = r.reqQueue.getUsage(); 169 } 170 return res; 171 } 172 173 181 public static RequestReader startRequestReader (boolean fixed) { 182 if (activeReaders () >= Server.srv.MAX_READERS) 183 return null; 184 short cid = readerID++; 185 RequestReader reqReader = new RequestReader (cid); 186 reqReader.isFixed = fixed; 187 if (readerID == Short.MAX_VALUE) 188 readerID = Short.MIN_VALUE; 189 reqReaders.add (reqReader); 190 if (fixed) { 191 reqReader.setName ("FIXED-RequestReader " + cid); 192 StringBuffer tsb = new StringBuffer ("Thread START: (FIXED THREAD, "); 194 tsb.append (reqReaders.size ()); 195 tsb.append (" threads running)"); 196 Server.log ("static RequestReader", tsb.toString (), Server.MSG_STATE, Server.LVL_MAJOR); 197 } else { 198 reqReader.setName ("RequestReader " + cid); 199 StringBuffer tsb = new StringBuffer ("Thread START: (").append (reqReaders.size ()).append (" threads running)"); 201 Server.log ("static RequestReader", tsb.toString (), Server.MSG_STATE, Server.LVL_MINOR); 202 } 203 reqReader.start (); 204 return reqReader; 205 } 206 207 211 public static void removeRequestReader (RequestReader reqReader) { 212 reqReaders.remove (reqReader); 213 StringBuffer tsb= new StringBuffer ("Thread STOP: (").append (reqReaders.size ()).append (" threads running)"); 214 Server.log ("static RequestReader", tsb.toString (), Server.MSG_STATE, Server.LVL_MINOR); 215 } 216 217 221 public static int activeReaders () { 222 return reqReaders.size (); 223 } 224 225 234 public void run() { 235 buf = ByteBuffer.allocate(Server.srv.READBUFFER_SIZE); 236 evaluator = new RequestEvaluator (this); 237 long lastReadTime = System.currentTimeMillis (); 238 shutdowntime = 0; 239 long lastMessage = 0; 240 boolean suspend = false; 241 while (!suspend) try { 242 if (Server.DEBUG || lastMessage + 5000 > System.currentTimeMillis()) { 243 Server.log (this, "loopstart", Server.MSG_STATE, Server.LVL_VERY_VERBOSE); 244 lastMessage = System.currentTimeMillis(); 245 } 246 currPosition=WAITING; 247 if (!Server.srv.isRunning ()) { 248 if (shutdowntime == 0) 249 shutdowntime = System.currentTimeMillis () + 150000; 250 if (shutdowntime < System.currentTimeMillis ()) { 251 suspend = true; 252 break; 253 } 254 } 255 long diff = Server.srv.READER_MAX_IDLETIME; 256 if (!this.isFixed) { 257 diff = System.currentTimeMillis () - lastReadTime; 259 if (diff > Server.srv.READER_MAX_IDLETIME 260 && activeReaders () > 1 261 && reqQueue.size() < 1) 262 break; 263 if (diff > Server.srv.READER_MAX_IDLETIME) 264 diff = Server.srv.READER_MAX_IDLETIME; 265 } 266 SelectionKey sk; 267 if (Server.srv.USE_CENTRAL_REQUESTQUEUE) { 268 synchronized (CentralSelector.cSel.reqQueue) { 269 if (CentralSelector.cSel.reqQueue.size() < 1) try { 270 CentralSelector.cSel.reqQueue.wait(Server.srv.READER_MAX_IDLETIME - diff); 271 } catch (InterruptedException ie) { } 272 sk = (SelectionKey) CentralSelector.cSel.reqQueue.pop(); 273 CentralSelector.cSel.reqQueue.notify(); 274 } 275 } else { 276 sk = reqQueue.popKey(diff); } 278 if (sk == null) { 279 try { 280 Thread.sleep (33); 281 } catch (InterruptedException ie) { } 282 continue; 283 } 284 currPosition=READING; 285 working = true; 286 workstart = lastReadTime = System.currentTimeMillis (); 287 long start = System.currentTimeMillis(); 288 StringBuffer sb = new StringBuffer (); 289 try { 290 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment(); 291 293 evaluate(sk, cb); 296 sb.append ("evaluate: took "); 297 301 } catch (Exception e) { 303 Server.debug (this, "catched Exception while reading/evaluating", e, Server.MSG_ERROR, Server.LVL_MAJOR); 304 try { 305 Thread.sleep (33); 306 } catch (InterruptedException ie) { } 307 continue; 308 } finally { 309 RequestMonitor.instance.removeMonitor(this); 310 working = false; 311 } 312 long proctime = System.currentTimeMillis () - start; 313 if (Server.checkLogLvl (Server.MSG_STATE, Server.LVL_VERY_VERBOSE)) { 314 sb.append (proctime); 315 sb.append (" millis "); 316 if (currentRequest == null) { 317 sb.append ("reading"); 318 } else { 319 sb.append ("reading and processing "); 320 sb.append (currentRequest.toString()); 321 } 322 Server.log (this, sb.toString(), Server.MSG_STATE, Server.LVL_VERBOSE); 323 } 324 } catch (Exception e) { 325 Server.debug (this, "(outer loop): ", e, Server.MSG_ERROR, Server.LVL_MAJOR); 326 } 327 removeRequestReader (this); 328 } 329 330 private void evaluate(SelectionKey sk, ConnectionBuffer cb) { 331 if (cb == null) { 332 Server.log(this, "ConnectionBuffer was null for Selectionkey", Server.MSG_ERROR, Server.LVL_MAJOR); 333 return; 334 } 335 if (!cb.isValid()) { 336 CentralSelector.dropKey(sk); 337 return; 338 } 339 if (cb.currentRequest != null) { 340 currentRequest = cb.currentRequest; 341 cb.currentRequest = null; 342 evaluator.evaluate (currentRequest); 343 } 344 } 345 346 public boolean isSuspending () { 347 return (shutdowntime != 0 && !isFixed); 348 } 349 public int hashCode () { return (int) ID; } 350 public boolean equals (RequestReader r) { return r.getID () == ID; } 351 public short getID () { return ID; } 352 353 private volatile String strgVal=null; 354 public String toString () { 355 if (strgVal == null) { 356 StringBuffer sb = new StringBuffer ("[RequestReader "); 357 if (ID < 10) { 358 sb.append (" "); 359 } else if (ID < 10) { 360 sb.append (" "); 361 } else if (ID < 100) { 362 sb.append (" "); 363 } else if (ID < 1000) { 364 sb.append (" "); 365 } 366 sb.append (ID); 367 sb.append ("]"); 368 strgVal = sb.toString(); 369 } 370 return (strgVal); 371 } 372 373 public void finalize() { 374 if (Server.TRACE_CREATE_AND_FINALIZE) 375 Server.log(this, "----------------------------------------FINALIZED", Server.MSG_STATE, Server.LVL_VERY_VERBOSE); 376 } 377 } | Popular Tags |