1 package com.ubermq.kernel; 2 3 import java.net.*; 4 import java.nio.channels.*; 5 6 import java.io.IOException ; 7 import java.util.Iterator ; 8 import java.util.Set ; 9 import java.util.Arrays ; 10 11 15 public class AcceptThread extends Thread 16 { 17 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AcceptThread.class); 18 19 private ServerSocketChannel ssc; 20 private SelectionKey listenKey; 21 private Selector connectSelector; 22 23 private IConnectionInfo.ConnectionAcceptor a; 24 25 private IDatagramFactory factory; 26 private IMessageProcessor incomingProc; 27 28 36 public AcceptThread(Selector connectSelector, 37 IConnectionInfo.ConnectionAcceptor a, 38 SocketAddress bindAddress, 39 IDatagramFactory factory, 40 IMessageProcessor incomingProc) 41 throws IOException 42 { 43 super("Acceptor"); 44 45 this.connectSelector = connectSelector; 46 this.a = a; 47 this.factory = factory; 48 this.incomingProc = incomingProc; 49 50 ssc = ServerSocketChannel.open(); 51 ssc.configureBlocking(false); 52 ssc.socket().bind(bindAddress); 53 54 log.debug("Bound to " + bindAddress); 55 listenKey = ssc.register(this.connectSelector, SelectionKey.OP_ACCEPT); 56 } 57 58 68 public AcceptThread(Selector connectSelector, 69 IConnectionInfo.ConnectionAcceptor a, 70 int port, 71 IDatagramFactory factory, 72 IMessageProcessor incomingProc) 73 throws IOException 74 { 75 this(connectSelector, 76 a, 77 new InetSocketAddress(port), 78 factory, 79 incomingProc); 80 } 81 82 public void run() { 83 while(!isInterrupted()) { 84 try { 85 int n = connectSelector.select(); 86 87 if (n > 0) 88 acceptPendingConnections(); 89 } catch(Exception ex) { 90 log.error("", ex); 91 } 92 } 93 94 try 96 { 97 listenKey.cancel(); 98 ssc.socket().close(); 99 ssc.close(); 100 101 System.out.println(ssc.socket().isClosed()); 102 } 103 catch (IOException e) { 104 log.error("", e); 105 } 106 107 log.info("Server listener shut down."); 109 } 110 111 private void acceptPendingConnections() throws IOException { 112 Set readyKeys = connectSelector.selectedKeys(); 113 114 for(Iterator i = readyKeys.iterator(); i.hasNext(); ) { 115 SelectionKey key = (SelectionKey)i.next(); 116 i.remove(); 117 118 ServerSocketChannel readyChannel = (ServerSocketChannel)key.channel(); 119 120 SocketChannel incomingChannel = readyChannel.accept(); 121 incomingChannel.configureBlocking(false); 122 configureSocket(incomingChannel.socket()); 123 log.debug("AcceptThread: Connection from " + incomingChannel.socket().getInetAddress()); 124 125 SocketConnectionInfo conn = new SocketChannelConnectionInfo(incomingChannel, factory, incomingProc); 128 incomingProc.accept(conn); 129 a.acceptIncomingConnection(conn); 130 } 131 } 132 133 private void configureSocket(Socket s) 134 throws SocketException 135 { 136 s.setTcpNoDelay(true); 137 } 138 139 } 140 141 | Popular Tags |