1 20 21 package freecs.core; 22 23 import java.io.IOException ; 24 import java.nio.BufferOverflowException ; 25 import java.nio.ByteBuffer ; 26 import java.nio.channels.CancelledKeyException ; 27 import java.nio.channels.ClosedChannelException ; 28 import java.nio.channels.SelectionKey ; 29 import java.nio.channels.Selector ; 30 import java.nio.channels.SocketChannel ; 31 import java.nio.channels.spi.SelectorProvider ; 32 import java.util.Iterator ; 33 import java.util.Set ; 34 35 import freecs.Server; 36 import freecs.interfaces.IContainer; 37 import freecs.util.ObjectBuffer; 38 39 44 public class Responder extends Thread { 45 public static final Responder res = new Responder(); 46 public static final Object CLOSE_CONNECTION = new Object (); 47 public static final Object CLOSE_CONNECTION_IGNORE = new Object (); 48 private Selector sel; 49 50 private Responder() { 51 try { 52 sel = SelectorProvider.provider().openSelector(); 53 } catch (Exception e) { 54 Server.debug (this, "Unable to start Responder!", e, Server.MSG_ERROR, Server.LVL_HALT); 55 } 56 } 57 58 public static void startResponder() { 59 if (!res.isAlive()) { 60 res.setName("Responder"); 61 res.start(); 62 } 63 res.setPriority(MAX_PRIORITY); 64 } 65 66 public void addToWrite(SocketChannel sc, ConnectionBuffer cb) { 67 if (!cb.isValid()) 68 return; 69 SelectionKey sk = sc.keyFor(res.sel); 70 if (sk!=null && !CentralSelector.isSkValid(sk)) { 71 Server.log(this, "addToWrite: tryed to add invalid key", Server.MSG_STATE, Server.LVL_VERBOSE); 72 return; 73 } 74 if (sk == null) try { 75 sc.register(sel, SelectionKey.OP_WRITE, cb); 76 } catch (ClosedChannelException cce) { 77 Server.debug (this, "addToWrite: Channel closed", cce, Server.MSG_ERROR, Server.LVL_VERY_VERBOSE); 78 dropKey(sk); 79 } 80 } 81 82 public void run() { 83 long lastMessage = 0; 84 while (Server.srv.isRunning() || CentralSelector.cSel.isAlive()) try { 85 long now = System.currentTimeMillis(); 86 if (Server.DEBUG || lastMessage + 5000 > System.currentTimeMillis()) { 87 Server.log (this, "loopstart", Server.MSG_STATE, Server.LVL_VERY_VERBOSE); 88 lastMessage = System.currentTimeMillis(); 89 } 90 int readKeys=0; 91 try { 92 readKeys = sel.selectNow(); 93 } catch (CancelledKeyException cke) { 94 Server.log(this, "CancelledKeyException while selectNow()", Server.MSG_ERROR, Server.LVL_VERBOSE); 95 } catch (IOException ioe) { 96 Server.debug(this, "run: ", ioe, Server.MSG_ERROR, Server.LVL_VERBOSE); 97 } catch (Exception e) { 98 Server.debug(this, "run: ", e, Server.MSG_ERROR, Server.LVL_MAJOR); 99 } 100 if (readKeys < 1) { 101 try { 102 Thread.sleep(33); 103 } catch (InterruptedException ie) { } 104 continue; 105 } 106 Set ks = sel.selectedKeys(); 107 if (ks!=null && !ks.isEmpty()) { 108 for (Iterator i = ks.iterator(); i.hasNext(); ) { 109 SelectionKey ck = (SelectionKey ) i.next(); 110 i.remove(); 111 if (!CentralSelector.isSkValid(ck)) { 112 Server.log (this, "run: current request has invalid key", Server.MSG_STATE, Server.LVL_VERBOSE); 113 dropKey(ck); 114 continue; 115 } 116 ConnectionBuffer cb = (ConnectionBuffer) ck.attachment(); 117 SocketChannel sc = (SocketChannel ) ck.channel(); 118 ObjectBuffer ob = cb.getWriteQueue(); 119 Object o; 120 while (true) { 121 synchronized (ob) { 122 if (ob.isEmpty()) 123 break; 124 o = ob.get(); 125 if (o==null) { 126 Server.log(this, "WriteQueue contained null", Server.MSG_ERROR, Server.LVL_VERBOSE); 128 ob.pop(); 129 continue; 130 } 131 } 132 if (o==CLOSE_CONNECTION_IGNORE) { 133 ck.cancel(); 134 ck.channel().close(); 135 break; 136 } 137 if (o==CLOSE_CONNECTION) { 138 dropKey(ck); 139 break; 140 } 141 synchronized (cb) { 142 cb.updateKeepAliveTimeout(); 143 } 144 ByteBuffer bb = o instanceof ByteBuffer ? (ByteBuffer ) o : ((IContainer) o).getByteBuffer(); 145 if (!writeContent(sc, bb)) 146 break; 147 if (o instanceof IContainer && ((IContainer) o).closeSocket()) { 148 dropKey(ck); 149 break; 150 } 151 synchronized (ob) { 152 ob.pop(); 153 } 154 } 155 } 156 } 157 try { 158 Thread.sleep(33); 159 } catch (InterruptedException ie) { } 160 } catch (Exception e) { 161 Server.debug (this, "run: ", e, Server.MSG_ERROR, Server.LVL_MAJOR); 162 } 163 } 164 165 private void dropKey (SelectionKey sk) { 166 if (sk == null) 167 return; 168 synchronized (sk) { 169 sk.cancel(); 170 } 171 SocketChannel sc = (SocketChannel ) sk.channel(); 172 CentralSelector.dropChannel(sc); 173 } 174 175 public void dropChannel (SocketChannel sc) { 176 SelectionKey sk = sc.keyFor(sel); 177 if (sk != null) synchronized (sk) { 178 sk.cancel(); 179 } 180 } 181 182 190 private boolean writeContent (SocketChannel sc, ByteBuffer bb) { 191 int written = 0; 192 try { 193 written = sc.write (bb); 194 } catch (IOException ioe) { 195 Server.log(this, "writeContent: remote host has closed connection", Server.MSG_TRAFFIC, Server.LVL_VERY_VERBOSE); 198 CentralSelector.dropChannel(sc); 199 return true; 200 } catch (BufferOverflowException boe) { 201 Server.log(this, "writeContent: bufferoverflowexception which should not apear acording to apidoc of sun...", Server.MSG_STATE, Server.LVL_MAJOR); 202 } 203 if (written < 0) { 204 return true; 205 } 206 if (bb.remaining () > 0) 207 return false; 208 return true; 209 } 210 211 public String toString() { 212 return "[Responder]"; 213 } 214 } 215 | Popular Tags |