1 package org.sapia.ubik.net.nio.acceptor; 2 3 import java.io.IOException ; 4 import java.nio.channels.Channel ; 5 import java.nio.channels.ClosedByInterruptException ; 6 import java.nio.channels.SelectionKey ; 7 import java.nio.channels.Selector ; 8 import java.util.Iterator ; 9 10 import org.sapia.ubik.net.NestedIOException; 11 import org.sapia.ubik.net.nio.ChannelManager; 12 import org.sapia.ubik.net.nio.Cycle; 13 import org.sapia.ubik.net.nio.CycleListener; 14 import org.sapia.ubik.net.nio.Dispatcher; 15 import org.sapia.ubik.net.nio.dispatcher.DefaultDispatcher; 16 import org.sapia.ubik.util.Debug; 17 import org.sapia.ubik.util.StartStopLock; 18 19 34 public class Acceptor { 35 36 private String _threadNamePrefix = "sapia.ubik.nio.acceptor"; 37 private boolean _started, _daemon; 38 private SelectorThread _selectorThread; 39 private ServerThread _serverThread; 40 private AcceptorConfig _config; 41 private CycleListener _listener; 42 43 public Acceptor(ChannelManager manager) { 44 _config = new AcceptorConfig(manager, new DefaultDispatcher()); 45 } 46 47 public Acceptor(ChannelManager manager, Dispatcher dispatcher) { 48 _config = new AcceptorConfig(manager, dispatcher); 49 } 50 51 55 public void setBufferSize(int bufSize){ 56 _config.pool.setBufSize(bufSize); 57 } 58 59 63 public void setDaemon(boolean daemon) { 64 _daemon = daemon; 65 } 66 67 72 public void setThreadNamePrefix(String prefix) { 73 if(prefix != null) 74 _threadNamePrefix = prefix; 75 } 76 77 81 public void setDebug(Debug debug) { 82 _config.debug = debug; 83 } 84 85 90 public Object getAddress(){ 91 if(_config.server == null){ 92 throw new IllegalStateException ("Server not started"); 93 } 94 95 int maxAttempts = 3; 96 int attempt = 0; 97 while(attempt<maxAttempts){ 98 Object address = _config.manager.getAddress(_config.server); 99 if(address != null){ 100 return address; 101 } 102 try{ 103 Thread.sleep(2000); 104 }catch(InterruptedException e){ 105 throw new IllegalStateException ("Thread interrupted"); 106 } 107 attempt++; 108 } 109 throw new IllegalStateException ("Could not acquire server address; server startup might have failed"); 110 } 111 112 115 public Channel getChannel(){ 116 if(_config.server == null){ 117 throw new IllegalStateException ("Server not started"); 118 } 119 120 return _config.server; 121 } 122 123 129 public synchronized void start() throws IOException { 130 131 Selector selector = Selector.open(); 132 _config.debug.out(getClass(), "Creating server channel"); 133 Channel channel = _config.manager.create(); 134 135 _serverThread = new ServerThread(); 136 _serverThread.setDaemon(_daemon); 137 _serverThread.setName(_threadNamePrefix + ".server"); 138 _serverThread._parent = this; 139 140 _selectorThread = new SelectorThread(); 141 _selectorThread.setDaemon(_daemon); 142 _selectorThread.setName(_threadNamePrefix + ".selector"); 143 _selectorThread._parent = this; 144 145 StateHandler[] handlers = new StateHandler[6]; 146 147 handlers[Cycle.STATE_READ] = new ReadStateHandler(_config); 148 handlers[Cycle.STATE_WRITE] = new WriteStateHandler(_config); 149 handlers[Cycle.STATE_PROCESS] = new ProcessStateHandler(_config); 150 handlers[Cycle.STATE_RECYCLE] = new RecycleStateHandler(_config); 151 handlers[Cycle.STATE_COMPLETE] = new CompleteStateHandler(_config); 152 handlers[Cycle.STATE_ERROR] = new ErrorStateHandler(_config); 153 154 _config.init(channel, selector, handlers); 155 156 _listener = new AcceptorCycleListener(_config); 157 158 _serverThread.start(); 159 try { 160 _serverThread._lock.waitStarted(); 161 } catch(InterruptedException e) { 162 throw new IOException ("Server thread interrupted at startup"); 163 } catch(IOException e) { 164 throw e; 165 } catch(Throwable err) { 166 throw new NestedIOException("Error caught while starting server", err); 167 } 168 169 _selectorThread.start(); 170 try { 171 _selectorThread._lock.waitStarted(); 172 } catch(InterruptedException e) { 173 throw new IOException ("Selector thread interrupted at startup"); 174 } catch(IOException e) { 175 throw e; 176 } catch(Throwable err) { 177 throw new NestedIOException("Error caught while starting selector", err); 178 } 179 180 _config.debug.out(getClass(), "Server started"); 181 } 182 183 186 public synchronized void stop() { 187 _config.debug.out(getClass(), "Stopping server..."); 188 try { 189 _serverThread._lock.triggerStop(); 190 _serverThread._lock.waitStopped(); 191 } catch(Throwable t) { 192 } 193 try { 194 _selectorThread._lock.triggerStop(); 195 _selectorThread._lock.waitStopped(); 196 } catch(Throwable t) { 197 } 198 _config.debug.out(getClass(), "Server stopped"); 199 } 200 201 203 static final class ServerThread extends Thread implements 204 StartStopLock.StopRequestListener { 205 206 Acceptor _parent; 207 StartStopLock _lock = new StartStopLock(this); 208 209 public void run() { 210 Debug debug = _parent._config.debug; 211 ChannelManager manager = _parent._config.manager; 212 Channel server = _parent._config.server; 213 Channel client; 214 while(!_lock.stopRequested && !interrupted()) { 215 try { 216 _lock.notifyStarted(null); 217 client = manager.accept(server); 218 if(client != null) { 219 AcceptorCycle cycle = new AcceptorCycle(manager, client, 220 _parent._config.pool, _parent._listener); 221 _parent._config.queue.add(cycle, false); 222 } 223 } catch(ClosedByInterruptException e) { 224 break; 225 } catch(IOException e) { 226 debug.out(getClass(), "IOException caught", e); 227 break; 228 } catch(Exception e) { 229 debug.out(getClass(), "Exception caught", e); 230 } 231 } 232 if(_lock.stopRequested) { 233 try { 234 server.close(); 235 _parent._config.dispatcher.close(); 236 } catch(IOException e2) { 237 } 238 _lock.notifyStopped(null); 239 } 240 } 241 242 245 public void onStopRequested() throws Throwable { 246 interrupt(); 247 } 248 249 } 250 251 static final class SelectorThread extends Thread implements 252 StartStopLock.StopRequestListener { 253 254 Acceptor _parent; 255 Exception _err; 256 StartStopLock _lock = new StartStopLock(this); 257 258 public void run() { 259 while(true){ 260 try{ 261 doRun(); 262 }catch(RuntimeException e){ 263 } 264 if(interrupted() || _lock.stopRequested){ 265 break; 266 } 267 } 268 } 269 270 private void doRun(){ 271 Debug debug = _parent._config.debug; 272 int selected, state; 273 SelectionKey key = null; 274 AcceptorCycle cycle = null; 275 276 while(!_lock.stopRequested && !isInterrupted()) { 277 selected = 0; 278 try { 279 _lock.notifyStarted(null); 280 selected = _parent._config.selector.select(); 281 } catch(IOException e) { 282 debug.out(getClass(), "Exception caught while selecting", e); 283 doStop(); 284 break; 285 } 286 287 if(_parent._config.queue.wasItemAdded()) { 288 try { 289 _parent._config.queue.register(); 290 } catch(IOException e) { 291 debug.out(getClass(), "Could not register pending clients", e); 292 } 293 } 294 295 if(selected > 0) { 296 Iterator itr = _parent._config.selector.selectedKeys().iterator(); 297 while(itr.hasNext()) { 298 key = (SelectionKey ) itr.next(); 299 cycle = (AcceptorCycle) key.attachment(); 300 if(cycle == null) { 301 handleError(cycle, new IllegalStateException ( 302 "Cycle instance not attached to key")); 303 continue; 304 } 305 key.cancel(); 306 try { 307 _parent._config.handlers[cycle.state()].handle(cycle); 308 } catch(HandlerException e) { 309 debug.out(getClass(), e.getMessage(), e); 310 handleError((AcceptorCycle) key.attachment(), e); 311 } 312 } 313 } 314 } 315 _lock.notifyStopped(null); 316 } 317 318 321 public void onStopRequested() throws Throwable { 322 interrupt(); 323 _parent._config.selector.wakeup(); 324 } 325 326 private synchronized void doStop() { 327 if(_parent._config.selector != null) { 328 Iterator keys = _parent._config.selector.keys().iterator(); 329 SelectionKey key; 330 while(keys.hasNext()) { 331 key = (SelectionKey ) keys.next(); 332 Object attached = key.attachment(); 333 if(attached instanceof AcceptorCycle) { 334 Cycle cycle = (Cycle) key.attachment(); 335 cycle.destroy(); 336 } 337 } 338 try { 339 _parent._config.selector.close(); 340 } catch(IOException e) { 341 } 342 } 343 } 344 345 private void handleError(Cycle cycle, Throwable t) { 346 if(cycle != null) { 347 cycle.destroy(); 348 } 349 } 350 } 351 352 } 353 | Popular Tags |