1 18 19 29 package freecs.core; 30 31 import freecs.Server; 32 import freecs.util.ObjectBuffer; 33 import java.util.Set ; 34 import java.util.Iterator ; 35 import java.io.IOException ; 36 import java.net.Socket ; 37 import java.nio.channels.Selector ; 38 import java.nio.channels.SelectionKey ; 39 import java.nio.channels.SocketChannel ; 40 import java.nio.channels.spi.SelectorProvider ; 41 import java.nio.channels.ClosedChannelException ; 42 import java.nio.channels.CancelledKeyException ; 43 44 public class CentralSelector extends Thread { 45 public static boolean stopped = false; 46 public static final CentralSelector cSel = new CentralSelector(); 47 private Selector sel = null; 48 private long rqLastChecked, nextUnavailableMessage=0; 49 public ObjectBuffer dropKeys; 50 51 public ObjectBuffer reqQueue = new ObjectBuffer (Server.srv.MAX_READERS*10); 52 53 private CentralSelector () { 54 dropKeys = new ObjectBuffer (10000); 55 if (!initCsel ()) 56 Server.log (this, "construct: unable to init Csel", Server.MSG_ERROR, Server.LVL_HALT); 57 } 58 59 private boolean initCsel () { 60 if (sel == null || !sel.isOpen ()) try { 61 sel = SelectorProvider.provider ().openSelector (); 62 } catch (IOException ioe) { 63 Server.debug (this, "initCsel:", ioe, Server.MSG_ERROR, Server.LVL_HALT); 64 return false; 65 } 66 if (sel != null && sel.isOpen ()) 67 return true; 68 return false; 69 } 70 71 public static void startCentralSelector () { 72 cSel.setName("CentralSelector"); 73 if (!cSel.isAlive()) 74 cSel.start(); 75 } 77 78 public int keyCount () { 79 Set keys = sel.keys (); 80 return keys.size (); 81 } 82 83 public void registerSC (SocketChannel sc, int reqType) throws IOException , ClosedChannelException { 84 if (sc == null) return; 85 sc.configureBlocking (false); 86 ConnectionBuffer cb = new ConnectionBuffer (reqType); 87 cb.setKey (sc.register (sel, SelectionKey.OP_READ, cb)); 88 } 89 90 public void run () { 91 Server.log (this, "starting up", Server.MSG_STATE, Server.LVL_MINOR); 92 int sdc = 500; 93 long lastMessage = 0; 94 Thread katc = new Thread (new KeepAliveTimeoutChecker()); 95 katc.start(); 96 while (Server.srv.isRunning () || sel.keys().size() > 0) try { 97 if (!Server.srv.isRunning ()) { 98 sdc--; 99 if (sdc <= 0) break; 100 } 101 if (Server.DEBUG || lastMessage + 5000 > System.currentTimeMillis()) { 102 Server.log (this, "loopstart: known sockets=" + sel.keys().size(), Server.MSG_STATE, Server.LVL_VERY_VERBOSE); 103 lastMessage = System.currentTimeMillis(); 104 } 105 while (!dropKeys.isEmpty()) { 106 SelectionKey sc; 107 synchronized (dropKeys) { 108 sc = (SelectionKey ) dropKeys.pop(); 109 dropKeys.notify(); 110 } 111 implCloseChannel (sc); 112 } 113 long now = System.currentTimeMillis(); 114 try { 115 if (sel.selectNow() < 1) { 116 try { 117 Thread.sleep (33); 118 } catch (InterruptedException ie) { } 119 continue; 120 } 121 } catch (Exception e) { 122 Server.debug (this, "run (select):", e, Server.MSG_ERROR , Server.LVL_MAJOR); 123 } 124 Set keys = sel.selectedKeys(); 125 if (keys!=null && !keys.isEmpty()) { 126 for (Iterator i = keys.iterator (); i.hasNext (); ) { 127 SelectionKey ck = (SelectionKey ) i.next (); 128 i.remove(); 129 try { 130 if (!CentralSelector.isSkValid(ck)) { 131 Server.log (this, "run: current key is invalid", Server.MSG_STATE, Server.LVL_VERBOSE); 132 continue; 133 } 134 if (ck.isReadable ()) { 135 readIn (ck); 136 } 137 } catch (CancelledKeyException cke) { } 138 } 139 } 140 try { 141 Thread.sleep (33); 142 } catch (InterruptedException ie) { } 143 } catch (Exception e) { 144 Server.debug (this, "(outer loop): ", e, Server.MSG_ERROR, Server.LVL_MAJOR); 145 } 146 katc.interrupt(); 147 if (sel != null) try { 148 Server.log (this, "closing down", Server.MSG_ERROR, Server.LVL_MAJOR); 149 sel.close (); 150 } catch (Exception e) { 151 Server.debug (this, "shutting down: ", e, Server.MSG_ERROR, Server.LVL_MAJOR); 152 } 153 Server.log (this, "suspended", Server.MSG_ERROR, Server.LVL_MINOR); 154 stopped = true; 155 } 158 159 private void readIn (SelectionKey sk) { 160 if (!CentralSelector.isSkValid(sk)) { 161 Server.log (this, "readIn: current request has invalid key", Server.MSG_STATE, Server.LVL_VERBOSE); 162 return; 163 } 164 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment (); 165 int bytesRead; 166 try { 167 synchronized (cb) { 168 SocketChannel sc = (SocketChannel ) sk.channel (); 169 bytesRead = sc.read (cb.rBuf); 170 if (bytesRead < 0) { 171 implCloseChannel (sk); 173 return; 174 } else if (bytesRead == 0) { 175 Server.log (this, "readIn: no data from socket", Server.MSG_STATE, Server.LVL_VERBOSE); 176 return; 177 } 178 cb.updateKeepAliveTimeout(); 179 cb.currentRequest = cb.append(); 180 if (cb.currentRequest != null) { 181 addRequest(sk, cb); 182 } 183 return; 184 } 185 } catch (IOException ioe) { 186 Server.debug (this, "readIn: droped key (IOException)", ioe, Server.MSG_ERROR, Server.LVL_VERY_VERBOSE); 187 implCloseChannel (sk); 188 cb.logError (ioe.getMessage()); 189 } catch (Exception e) { 190 Server.debug (this, "readIn: Exception encountered while reading: ", e, Server.MSG_ERROR, Server.LVL_MAJOR); 191 implCloseChannel (sk); 192 cb.logError (e.getMessage()); 193 } 194 } 195 196 public void addRequest(SelectionKey sk, ConnectionBuffer cb) { 197 if (Server.srv.USE_CENTRAL_REQUESTQUEUE 198 && !this.addRequestToQueue (sk)) { 199 implCloseChannel (sk); 200 if (nextUnavailableMessage >= System.currentTimeMillis()) 201 return; 202 cb.logError ("RequestQueue is full"); 203 Server.log (this, "readIn: RequestQueue is full", Server.MSG_ERROR, Server.LVL_MAJOR); 204 nextUnavailableMessage += 1000; 205 } else if (!Server.srv.USE_CENTRAL_REQUESTQUEUE 206 && !RequestReader.processRequest(sk)) { 207 implCloseChannel (sk); 208 if (nextUnavailableMessage >= System.currentTimeMillis()) 209 return; 210 cb.logError ("No available requestreader"); 211 Server.log (this, "readIn: No availabel requestreader to process request", Server.MSG_ERROR, Server.LVL_MAJOR); 212 nextUnavailableMessage += 1000; 213 } 214 } 215 216 private void implCloseChannel (SelectionKey sk) { 217 try { 218 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment(); 219 if (cb != null) { 220 cb.invalidate(); 221 User u = cb.getUser(); 222 if (u!=null && sk.equals(u.getKey()) && !u.isRemoving() && !u.isLoggedOut()) { 223 StringBuffer sb = new StringBuffer ("implCloseChannel: droped key for user ").append (u.getName ()); 224 Server.log ("static CentralSelector", sb.toString (), Server.MSG_STATE, Server.LVL_VERBOSE); 225 u.scheduleToRemove(); 226 } 227 } 228 SocketChannel sc = (SocketChannel ) sk.channel(); 229 Responder.res.dropChannel(sc); 230 synchronized (sc) { 231 Socket s = sc.socket(); 232 s.close(); 233 sc.close(); 234 } 235 sk.cancel(); 236 } catch (Exception e) { 237 Server.debug (this, "closeChannel: ", e, Server.MSG_ERROR, Server.LVL_MAJOR); 238 sk.cancel(); 239 } 240 } 241 242 public static void dropKey (SelectionKey sk) { 243 if (sk == null) return; 244 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment (); 245 if (cb != null) { 246 cb.invalidate(); 247 } 248 addToDropKeys (sk); 249 } 250 251 public static void dropChannel (SocketChannel sc) { 252 SelectionKey sk = sc.keyFor(cSel.sel); 253 if (sk == null) { 254 try { 255 sc.close(); 256 } catch (IOException e) { 257 Server.debug ("static CentralSelector", "dropChannle:", e, Server.MSG_ERROR, Server.LVL_MAJOR); 258 } 259 return; 260 } 261 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment (); 262 if (cb != null) { 263 cb.invalidate(); 264 } 265 addToDropKeys (sk); 266 } 267 268 269 private static void addToDropKeys (SelectionKey sk) { 270 long now = System.currentTimeMillis(); 271 long stop = now + 5000; 272 synchronized (cSel.dropKeys) { 273 boolean success=cSel.dropKeys.put(sk); 274 while (!success && stop > now) { 275 try { 276 now = System.currentTimeMillis(); 277 long waitTime = stop - now; 278 if (waitTime > 32) 279 cSel.dropKeys.wait(stop - now); 280 } catch (InterruptedException ie) { } 281 success=cSel.dropKeys.put(sk); 282 } 283 if (!success) 284 Server.log("static CentralSelector", "dropKey: unable to add dropkey", Server.MSG_ERROR, Server.LVL_MAJOR); 285 cSel.dropKeys.notify(); 286 } 287 } 288 289 public static boolean isSkValid (SelectionKey sk) { 290 if (!chkSk(sk)) { 291 if (sk != null && cSel.equals(sk.selector())) 292 dropKey (sk); 293 return false; 294 } 295 return true; 296 } 297 298 private static boolean chkSk (SelectionKey sk) { 299 if (sk == null) 300 return false; 301 try { 302 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment(); 303 if (cb == null || !cb.isValid()) 304 return false; 305 if (!sk.isValid() || !sk.channel().isOpen()) { 306 cb.invalidate(); 307 return false; 308 } 309 Socket s = ((SocketChannel ) sk.channel()).socket(); 310 if (s.isInputShutdown() || s.isOutputShutdown()) { 311 cb.invalidate(); 312 return false; 313 } 314 if (cb != null) { 315 if (!cb.isValid()) 316 return false; 317 } 318 } catch (Exception e) { 319 Server.debug ("static CentralSelector", "SelectionKey-Check:", e, Server.MSG_ERROR, Server.LVL_MAJOR); 320 return false; 321 } 322 return true; 323 } 324 325 private boolean addRequestToQueue(SelectionKey sk) { 326 long stop = System.currentTimeMillis() + 1000; 327 boolean success=false; 328 try { 329 synchronized (this.reqQueue) { 330 if (reqQueue.contains(sk)) 331 return true; 332 success = reqQueue.put(sk); 333 while (!success 334 && stop > System.currentTimeMillis()) { 335 this.reqQueue.wait(stop - System.currentTimeMillis()); 336 success = reqQueue.put(sk); 337 } 338 if (success) 339 this.reqQueue.notify(); 340 } 341 } catch (Exception e) { 342 Server.debug (this, "addRequestToQueue caused exception:", e, Server.MSG_ERROR, Server.LVL_MAJOR); 343 } 344 if (reqQueue.size() > ((this.reqQueue.capacity()/1.5)/RequestReader.activeReaders())) 345 RequestReader.startRequestReader(false); 346 return success; 347 } 348 349 public boolean equals (Object o) { 350 return o instanceof Selector && o.equals(sel); 351 } 352 353 public String toString() { 354 return "[CentralSelector]"; 355 } 356 357 private class KeepAliveTimeoutChecker implements Runnable { 358 private short loglvl = Server.LVL_VERY_VERBOSE; 359 KeepAliveTimeoutChecker() { } 360 public void run () { 361 long nextCheck = 0; 362 while (Server.srv.isRunning()) { 363 long now = System.currentTimeMillis(); 364 if (nextCheck>now) { 365 long diff = Math.max(nextCheck - now, 33); 366 Server.log("KeepAliveCheck", "sleeping for " + diff + " millis", Server.MSG_STATE, loglvl); 367 try { 368 Thread.sleep(diff); 369 } catch (InterruptedException ie) { } 370 now = System.currentTimeMillis(); 371 } 372 nextCheck = now + Server.srv.KEEP_ALIVE_TIMEOUT; 373 SelectionKey [] checkArr; 374 Server.log("KeepAliveCheck", "sync on selector", Server.MSG_STATE, loglvl); 375 synchronized (CentralSelector.cSel.sel) { 376 if (!CentralSelector.cSel.sel.isOpen()) { 377 Server.log ("KeepAliveTimeoutChecker", "Selector closed. Shutting down KeepAliveTimeoutChecker", Server.MSG_STATE, Server.LVL_MINOR); 378 return; 379 } 380 Set keyset = CentralSelector.cSel.sel.keys(); 381 Server.log("KeepAliveCheck", "sync on selectors keyset", Server.MSG_STATE, loglvl); 382 synchronized (keyset) { 383 checkArr = (SelectionKey []) keyset.toArray(new SelectionKey [0]); 384 } 385 } 386 Server.log("KeepAliveCheck", "processing " + checkArr.length + "keys", Server.MSG_STATE, loglvl); 387 for (int i = 0; i < checkArr.length; i++) { 388 SelectionKey sk = checkArr[i]; 389 if (!sk.isValid() || !sk.channel().isOpen()) 390 continue; 391 ConnectionBuffer cb = (ConnectionBuffer) sk.attachment(); 392 synchronized (cb) { 393 long kato = cb.getKeepAliveTimeout(now); 394 if (kato < 0) continue; 396 if (kato <= now) { 397 Server.log("KeepAliveCheck", "closing connection to " + cb.conn, Server.MSG_STATE, loglvl); 398 CentralSelector.dropKey(sk); 399 } else if (kato < nextCheck) 400 nextCheck = kato; 401 } 402 } 403 Server.log("KeepAliveCheck", "checking took me " + (System.currentTimeMillis()-now) + " millis", Server.MSG_STATE, loglvl); 404 } 405 } 406 } 407 } 408 | Popular Tags |