1 package org.sapia.ubik.net.mplex; 2 3 import org.sapia.ubik.rmi.server.Log; 4 5 import java.io.IOException ; 6 7 import java.net.InetAddress ; 8 import java.net.ServerSocket ; 9 import java.net.Socket ; 10 import java.net.SocketException ; 11 12 import java.util.ArrayList ; 13 import java.util.Date ; 14 import java.util.Iterator ; 15 import java.util.List ; 16 import org.sapia.ubik.util.Localhost; 17 18 19 72 public class MultiplexServerSocket extends ServerSocket implements Runnable { 73 76 public static final short DEFAULT_READ_AHEAD_BUFFER_SIZE = 64; 77 78 81 public static final short DEFAULT_ACCEPTOR_DAEMON_THREAD = 1; 82 83 86 public static final short DEFAULT_SELECTOR_DAEMON_THREAD = 1; 87 88 89 private List _theConnectors = new ArrayList (); 90 91 92 private SocketConnectorImpl _theDefaultConnector; 93 94 95 private List _theAcceptorDaemons = new ArrayList (); 96 97 98 private List _theSelectorDaemons = new ArrayList (); 99 100 101 private SocketQueue _theAcceptedQueue = new SocketQueue(); 102 103 104 private int _theAcceptorDaemonThread = DEFAULT_ACCEPTOR_DAEMON_THREAD; 105 106 107 private int _theSelectorDaemonThread = DEFAULT_SELECTOR_DAEMON_THREAD; 108 109 110 private int _theReadAheadBufferSize = DEFAULT_READ_AHEAD_BUFFER_SIZE; 111 112 118 public MultiplexServerSocket() throws IOException { 119 super(); 120 } 121 122 134 public MultiplexServerSocket(int port) throws IOException { 135 super(port, 50, Localhost.getLocalAddress()); 136 } 137 138 152 public MultiplexServerSocket(int port, int backlog) throws IOException { 153 super(port, backlog, Localhost.getLocalAddress()); 154 } 155 156 173 public MultiplexServerSocket(int port, int backlog, InetAddress bindAddr) 174 throws IOException { 175 super(port, backlog, bindAddr); 176 } 177 178 184 public int getReadAheadBufferSize() { 185 return _theReadAheadBufferSize; 186 } 187 188 195 public void setReadAheadBufferSize(int aSize) { 196 if (aSize <= 0) { 197 throw new IllegalArgumentException ("The size is less than zero"); 198 } else if (_theAcceptorDaemons.size() > 0) { 199 throw new IllegalStateException ( 200 "Cannot change the read ahead buffer size on a running server socket"); 201 } 202 203 _theReadAheadBufferSize = aSize; 204 } 205 206 211 public int getAcceptorDaemonThread() { 212 return _theAcceptorDaemonThread; 213 } 214 215 220 public int getSelectorDaemonThread() { 221 return _theSelectorDaemonThread; 222 } 223 224 230 public void setAcceptorDaemonThread(int maxThread) { 231 if (maxThread <= 0) { 232 throw new IllegalArgumentException ("The size is less than zero"); 233 } else if (_theAcceptorDaemons.size() > 0) { 234 throw new IllegalStateException ( 235 "Cannot change the number of acceptor daemons on a running server socket"); 236 } 237 238 _theAcceptorDaemonThread = maxThread; 239 } 240 241 247 public void setSelectorDaemonThread(int maxThread) { 248 if (maxThread <= 0) { 249 throw new IllegalArgumentException ("The size is less than zero"); 250 } else if (_theSelectorDaemons.size() > 0) { 251 throw new IllegalStateException ( 252 "Cannot change the number of selector daemons on a running server socket"); 253 } 254 255 _theSelectorDaemonThread = maxThread; 256 } 257 258 268 public synchronized MultiplexSocketConnector createSocketConnector( 269 StreamSelector aSelector) { 270 if (aSelector == null) { 271 throw new IllegalArgumentException ("The selector passed in is null"); 272 } else if (isClosed()) { 273 throw new IllegalStateException ( 274 "Could not create a socket connector, the server socket is closed"); 275 } 276 277 MultiplexSocketConnector aConnector = new SocketConnectorImpl(this, 278 aSelector, new SocketQueue()); 279 _theConnectors.add(aConnector); 280 281 return aConnector; 282 } 283 284 289 private synchronized void initializeDefaultConnector() { 290 if (_theDefaultConnector == null) { 291 _theDefaultConnector = new SocketConnectorImpl(this, 293 new PositiveStreamSelector(), new SocketQueue()); 294 295 for (int i = 1; i <= _theAcceptorDaemonThread; i++) { 297 Thread aDaemon = new Thread (new SelectorTask(), 298 "MultiplexServerSocket-Selector" + i); 299 aDaemon.setDaemon(true); 300 _theSelectorDaemons.add(aDaemon); 301 aDaemon.start(); 302 } 303 304 for (int i = 1; i <= _theAcceptorDaemonThread; i++) { 306 Thread aDaemon = new Thread (this, "MultiplexServerSocket-Acceptor" + i); 307 aDaemon.setDaemon(true); 308 _theAcceptorDaemons.add(aDaemon); 309 aDaemon.start(); 310 } 311 } 312 } 313 314 321 public synchronized void removeSocketConnector( 322 MultiplexSocketConnector aConnector) { 323 if (aConnector == null) { 324 throw new IllegalArgumentException ("The connector passed in is null"); 325 } 326 327 _theConnectors.remove(aConnector); 328 } 329 330 338 private byte[] extractHeader(MultiplexSocket aSocket, int aMaxLength) 339 throws IOException { 340 byte[] preview = new byte[aMaxLength]; 342 int length = aSocket.getPushbackInputStream().read(preview, 0, 343 preview.length); 344 345 aSocket.getPushbackInputStream().unread(preview, 0, length); 347 348 byte[] header; 350 351 if (length < preview.length) { 352 header = new byte[length]; 353 System.arraycopy(preview, 0, header, 0, length); 354 } else { 355 header = preview; 356 } 357 358 return header; 359 } 360 361 372 public Socket accept() throws IOException { 373 if (isClosed()) { 374 throw new SocketException ("Socket is closed"); 375 } else if (!isBound()) { 376 throw new SocketException ("Socket is not bound yet"); 377 } else if (_theDefaultConnector == null) { 378 initializeDefaultConnector(); 379 } 380 381 return _theDefaultConnector.getQueue().getSocket(); 382 } 383 384 390 public synchronized void close() throws IOException { 391 try { 392 super.close(); 393 } finally { 394 if (_theDefaultConnector != null) { 395 _theDefaultConnector.close(); 396 } 397 398 if (_theConnectors != null) { 400 ArrayList someConnectors = new ArrayList (_theConnectors); 402 403 for (Iterator it = someConnectors.iterator(); it.hasNext();) { 404 MultiplexSocketConnector aConnector = (MultiplexSocketConnector) it.next(); 405 aConnector.close(); 406 } 407 } 408 } 409 } 410 411 417 public String toString() { 418 StringBuffer aBuffer = new StringBuffer (); 419 aBuffer.append("MultiplexServerSocket[").append(super.toString()).append("]"); 420 421 return aBuffer.toString(); 422 } 423 424 431 public void run() { 432 try { 433 Log.warning(getClass(), 434 new Date () + " [" + Thread.currentThread().getName() + 435 "] MultiplexServerSocket * REPORT * Starting this acceptor thread"); 436 437 while (!isClosed() && !Thread.interrupted()) { 439 try { 440 MultiplexSocket aClient = new MultiplexSocket(null, 442 _theReadAheadBufferSize); 443 implAccept(aClient); 444 445 _theAcceptedQueue.add(aClient); 446 } catch (IOException ioe) { 447 _theDefaultConnector.getQueue().setException(ioe); 448 } 449 } 450 } catch (Exception e) { 451 Log.error(getClass(), 452 new Date () + " [" + Thread.currentThread().getName() + 453 "] MultiplexServerSocket * ERROR * An unhandled exception occured in this acceptor thread", 454 e); 455 } finally { 456 Log.warning(getClass(), 457 new Date () + " [" + Thread.currentThread().getName() + 458 "] MultiplexServerSocket * REPORT * Stopping this acceptor thread"); 459 } 460 } 461 462 472 private SocketConnectorImpl selectConnector(MultiplexSocket aClient) 473 throws IOException { 474 SocketConnectorImpl aConnector = null; 475 476 if (_theConnectors.size() > 0) { 477 byte[] header = extractHeader(aClient, _theReadAheadBufferSize); 479 480 synchronized (this) { 482 for (Iterator it = _theConnectors.iterator(); it.hasNext();) { 483 SocketConnectorImpl aCandidate = (SocketConnectorImpl) it.next(); 484 485 if (aCandidate.getSelector().selectStream(header)) { 486 aConnector = aCandidate; 487 488 break; 489 } 490 } 491 } 492 } 493 494 return aConnector; 495 } 496 497 500 public class SelectorTask implements Runnable { 501 506 public void run() { 507 try { 508 Log.warning(getClass(), 509 new Date () + " [" + Thread.currentThread().getName() + 510 "] MultiplexServerSocket * REPORT * Starting this selector thread"); 511 512 while (!isClosed() && !Thread.interrupted()) { 514 MultiplexSocket aSocket; 515 SocketConnectorImpl aConnector; 516 517 try { 518 aSocket = (MultiplexSocket) _theAcceptedQueue.getSocket(); 520 521 aConnector = selectConnector(aSocket); 523 524 if (aConnector == null) { 526 _theDefaultConnector.getQueue().add(aSocket); 527 } else { 528 aConnector.getQueue().add(aSocket); 529 } 530 } catch (IOException ioe) { 531 _theDefaultConnector.getQueue().setException(ioe); 532 } 533 } 534 } catch (Exception e) { 535 Log.error(new Date () + " [" + Thread.currentThread().getName() + 536 "] MultiplexServerSocket * ERROR * An unhandled exception occured in this selector thread", 537 e); 538 } finally { 539 Log.warning(getClass(), 540 new Date () + " [" + Thread.currentThread().getName() + 541 "] MultiplexServerSocket * REPORT * Stopping this selector thread"); 542 } 543 } 544 } 545 } 546 | Popular Tags |