1 2 package transport.channel; 3 4 import java.io.IOException ; 5 import java.net.InetAddress ; 6 import java.net.InetSocketAddress ; 7 import java.net.UnknownHostException ; 8 import java.nio.channels.SelectionKey ; 9 import java.nio.channels.Selector ; 10 import java.nio.channels.ServerSocketChannel ; 11 import java.nio.channels.SocketChannel ; 12 import java.util.Iterator ; 13 import java.util.Set ; 14 15 import org.apache.commons.logging.Log; 16 import org.apache.commons.logging.LogFactory; 17 18 import transport.GetLocalAddressCommand; 19 import transport.GetLocalAddressResponse; 20 import transport.LocalAddress; 21 import transport.PacketTransportCommand; 22 import transport.PacketTransportStatistics; 23 import transport.PacketTransportException; 24 import transport.Terminated; 25 26 27 import jegg.EggBase; 28 import jegg.PortException; 29 import jegg.Port; 30 31 32 38 public class PortManager extends EggBase 39 { 40 41 private static final Log LOG = LogFactory.getLog(PortManager.class); 42 43 private static final long MAX_WAIT_MSEC = 500; 44 45 private LocalAddress _localAddress; 46 private boolean _open = false; 47 private InetAddress _tcpIP; 48 private int _tcpPort = -1; 49 private Port _fromPort = null; 50 private long _noClients = 0; 51 private long _timeStarted = 0; 52 53 private ServerSocketChannel _socketChannel; 54 private Selector _selector; 55 private Port _channelListPort; 56 59 public PortManager() 60 { 61 super(); 62 } 63 64 public void init() 65 { 66 } 68 69 73 public void handle(Port p) 74 { 75 _channelListPort = p; 76 } 77 78 81 public void handle(Object m) 82 { 83 LOG.warn("Unexpected message: " + m); 84 } 85 86 89 public void handle(LocalAddress config) 90 { 91 if (LOG.isInfoEnabled()) 92 LOG.info("handle(LocalAddress)"); 93 94 _localAddress = config; 95 InetAddress newAddr = config.getLocalIP(); 96 97 if (null == newAddr) 98 { 99 try 100 { 101 newAddr = InetAddress.getLocalHost(); 102 } 103 catch (UnknownHostException e) 104 { 105 LOG.error("Unable to open network connection: ", e); 106 return; 107 } 108 } 109 110 int newPort = config.getLocalPort(); 111 112 if (0 >= newPort) 113 { 114 String s = "Invalid port: " + newPort; 115 LOG.error(s); 116 try 117 { 118 getContext().getPort().send(getContext().createMessage(new PacketTransportException(s))); 119 } 120 catch (PortException e) 121 { 122 LOG.error("Failed to send packet-transport-exception", e); 123 } 124 } 125 else 126 { 127 _tcpIP = newAddr; 130 _tcpPort = newPort; 131 _fromPort = getContext().getFromPort(); 132 if (null != _channelListPort) {open();} 133 } 134 } 135 136 140 public void handle(AcceptClient ac) 141 { 142 if (LOG.isDebugEnabled()) 143 LOG.debug("handle(AcceptClient)"); 144 145 try 146 { 147 SocketChannel c = waitForClient(MAX_WAIT_MSEC); 148 149 if (null != c) 150 { 151 LOG.info("Sending new channel to ChannelList"); 152 _channelListPort.send(getContext().createMessage(new AddChannelCommand(c))); 153 } 154 } 155 catch (IOException e) 156 { 157 LOG.error("Unable to accept client connection", e); 158 } 159 catch (PortException e) 160 { 161 LOG.error("Failed to send new channel to channel list", e); 162 } 163 164 try 165 { 166 getContext().getPort().send(getContext().createMessage(ac)); 167 } 168 catch (PortException e1) 169 { 170 LOG.error("Unable to resend AcceptClient message", e1); 171 } 172 } 173 174 177 public void handle(GetLocalAddressCommand cmd) 178 { 179 getContext().respond(new GetLocalAddressResponse(_localAddress)); 180 } 181 182 185 public void handle(PacketTransportCommand cmd) 186 { 187 if (LOG.isDebugEnabled()) 188 LOG.debug("handle(PortManagerCommand)"); 189 190 if (PacketTransportCommand.QUERY_CONFIG.equals(cmd)) 191 { 192 getContext().respond(new LocalAddress(_tcpIP, _tcpPort)); 193 } 194 else 195 if (PacketTransportCommand.TERMINATE.equals(cmd)) 196 { 197 closePort(); 198 getContext().respond(new Terminated()); 199 } 200 else 201 if (PacketTransportCommand.QUERY_STATS.equals(cmd)) 202 { 203 PacketTransportStatistics stats = new PacketTransportStatistics(); 204 stats.port = _tcpPort; 205 stats.numberClientsAccepted = _noClients; 206 stats.timeStarted = _timeStarted; 207 getContext().respond(stats); 208 } 209 else 210 { 211 LOG.warn("Unrecognized command: " + cmd); 212 } 213 } 214 215 219 224 private void open() 225 { 226 open(_tcpIP, _tcpPort, _fromPort); 227 } 228 229 234 private void open(InetAddress addr, int port, Port responsePort) 235 { 236 try 237 { 238 openPort(addr,port); 239 getContext().respond(responsePort, new PortManagerReady()); 240 try 241 { 242 getContext().getPort().send(getContext().createMessage(new AcceptClient())); 243 } 244 catch (PortException e1) 245 { 246 LOG.error("Failed to send initial accept-client", e1); 247 } 248 _timeStarted = System.currentTimeMillis(); 249 } 250 catch (IOException e) 251 { 252 String s = "Unable to open port: " +port; 253 getContext().respond(new PacketTransportException(s,e)); 254 } 255 } 256 257 260 private void closePort() 261 { 262 if (_open) 263 { 264 try 265 { 266 if (null != _socketChannel) _socketChannel.close(); 267 } 268 catch (IOException e) 269 { 270 LOG.error("Unexpected error while closing server socket: ", e); 271 } 272 } 273 _open = false; 274 } 275 276 282 private void openPort(InetAddress addr, int p) throws IOException 283 { 284 closePort(); 286 _tcpPort = p; 287 _socketChannel = ServerSocketChannel.open(); 288 _socketChannel.socket().setReuseAddress(true); 289 InetSocketAddress sa = new InetSocketAddress (addr,p); 290 _socketChannel.socket().bind(sa); 291 if (LOG.isInfoEnabled()) 292 { 293 LOG.info("Port bound: " + Integer.toString(p)); 294 } 295 _socketChannel.configureBlocking(false); 296 _selector = Selector.open(); 297 _socketChannel.register(_selector, SelectionKey.OP_ACCEPT); 298 LOG.debug("Port opened"); 299 _open = true; 300 } 301 302 306 private SocketChannel waitForClient(long maxWait_msec) throws IOException 307 { 308 SocketChannel ch = null; 309 310 _selector.select(maxWait_msec); 311 Set selectedKeys = _selector.selectedKeys(); 312 for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) 313 { 314 SelectionKey key = (SelectionKey ) it.next(); 315 it.remove(); 316 key.cancel(); 317 _selector.selectNow(); 318 ch = _socketChannel.accept(); 319 LOG.info("Got client"); 320 _socketChannel.register(_selector,SelectionKey.OP_ACCEPT); 321 } 322 if (null != ch) 323 { 324 _noClients++; 325 } 326 return ch; 327 } 328 } 329 | Popular Tags |