KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > transport > channel > PortManager


1
2 package transport.channel;
3
4 import java.io.IOException JavaDoc;
5 import java.net.InetAddress JavaDoc;
6 import java.net.InetSocketAddress JavaDoc;
7 import java.net.UnknownHostException JavaDoc;
8 import java.nio.channels.SelectionKey JavaDoc;
9 import java.nio.channels.Selector JavaDoc;
10 import java.nio.channels.ServerSocketChannel JavaDoc;
11 import java.nio.channels.SocketChannel JavaDoc;
12 import java.util.Iterator JavaDoc;
13 import java.util.Set JavaDoc;
14
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17
18 import transport.GetLocalAddressCommand;
19 import transport.GetLocalAddressResponse;
20 import transport.LocalAddress;
21 import transport.PacketTransportCommand;
22 import transport.PacketTransportStatistics;
23 import transport.PacketTransportException;
24 import transport.Terminated;
25
26
27 import jegg.EggBase;
28 import jegg.PortException;
29 import jegg.Port;
30
31
32 /**
33  * PortManager manages the list of open ports that the application will
34  * accept remote connections on. A possibliy interesting aspect of this
35  * egg's implementation is how it sends a message to itself to trigger
36  * the check for new client connections.
37  */

38 public class PortManager extends EggBase
39 {
40     /** Class logger */
41     private static final Log LOG = LogFactory.getLog(PortManager.class);
42     /** */
43     private static final long MAX_WAIT_MSEC = 500;
44     
45     private LocalAddress _localAddress;
46     private boolean _open = false;
47     private InetAddress JavaDoc _tcpIP;
48     private int _tcpPort = -1;
49     private Port _fromPort = null;
50     private long _noClients = 0;
51     private long _timeStarted = 0;
52     
53     private ServerSocketChannel JavaDoc _socketChannel;
54     private Selector JavaDoc _selector;
55     private Port _channelListPort;
56     /**
57      * Create a port manager assigned to the default dispatcher.
58      */

59     public PortManager()
60     {
61         super();
62     }
63     
64     public void init()
65     {
66         // EMPTY
67
}
68     
69     // ---------------------------------------------------------------------
70
// Message Handlers
71
// ---------------------------------------------------------------------
72

73     public void handle(Port p)
74     {
75         _channelListPort = p;
76     }
77     
78     /* (non-Javadoc)
79      * @see egg.Egg#handle(java.lang.Object)
80      */

81     public void handle(Object JavaDoc m)
82     {
83         LOG.warn("Unexpected message: " + m);
84     }
85     
86     /**
87      * Configure the TCP port used to accept incoming connections.
88      */

89     public void handle(LocalAddress config)
90     {
91         if (LOG.isInfoEnabled())
92             LOG.info("handle(LocalAddress)");
93         
94         _localAddress = config;
95         InetAddress JavaDoc newAddr = config.getLocalIP();
96         
97         if (null == newAddr)
98         {
99             try
100             {
101                 newAddr = InetAddress.getLocalHost();
102             }
103             catch (UnknownHostException JavaDoc e)
104             {
105                 LOG.error("Unable to open network connection: ", e);
106                 return;
107             }
108         }
109         
110         int newPort = config.getLocalPort();
111         
112         if (0 >= newPort)
113         {
114             String JavaDoc s = "Invalid port: " + newPort;
115             LOG.error(s);
116             try
117             {
118                 getContext().getPort().send(getContext().createMessage(new PacketTransportException(s)));
119             }
120             catch (PortException e)
121             {
122                 LOG.error("Failed to send packet-transport-exception", e);
123             }
124         }
125         else
126         {
127             // If we haven't received the channel list's egg port
128
// yet, then save the configuration info till we get it.
129
_tcpIP = newAddr;
130             _tcpPort = newPort;
131             _fromPort = getContext().getFromPort();
132             if (null != _channelListPort) {open();}
133         }
134     }
135
136     /**
137      * Check for new clients. This message is used internally -
138      * it's not sent by external eggs.
139      */

140     public void handle(AcceptClient ac)
141     {
142         if (LOG.isDebugEnabled())
143             LOG.debug("handle(AcceptClient)");
144         
145         try
146         {
147             SocketChannel JavaDoc c = waitForClient(MAX_WAIT_MSEC);
148
149             if (null != c)
150             {
151                 LOG.info("Sending new channel to ChannelList");
152                 _channelListPort.send(getContext().createMessage(new AddChannelCommand(c)));
153             }
154         }
155         catch (IOException JavaDoc e)
156         {
157             LOG.error("Unable to accept client connection", e);
158         }
159         catch (PortException e)
160         {
161             LOG.error("Failed to send new channel to channel list", e);
162         }
163         
164         try
165         {
166             getContext().getPort().send(getContext().createMessage(ac));
167         }
168         catch (PortException e1)
169         {
170             LOG.error("Unable to resend AcceptClient message", e1);
171         }
172     }
173
174     /**
175      * Handle query of the current configuration.
176      */

177     public void handle(GetLocalAddressCommand cmd)
178     {
179         getContext().respond(new GetLocalAddressResponse(_localAddress));
180     }
181     
182     /**
183      * Handle managment commands.
184      */

185     public void handle(PacketTransportCommand cmd)
186     {
187         if (LOG.isDebugEnabled())
188             LOG.debug("handle(PortManagerCommand)");
189         
190         if (PacketTransportCommand.QUERY_CONFIG.equals(cmd))
191         {
192             getContext().respond(new LocalAddress(_tcpIP, _tcpPort));
193         }
194         else
195         if (PacketTransportCommand.TERMINATE.equals(cmd))
196         {
197             closePort();
198             getContext().respond(new Terminated());
199         }
200         else
201         if (PacketTransportCommand.QUERY_STATS.equals(cmd))
202         {
203             PacketTransportStatistics stats = new PacketTransportStatistics();
204             stats.port = _tcpPort;
205             stats.numberClientsAccepted = _noClients;
206             stats.timeStarted = _timeStarted;
207             getContext().respond(stats);
208         }
209         else
210         {
211             LOG.warn("Unrecognized command: " + cmd);
212         }
213     }
214     
215     // ---------------------------------------------------------------------
216
// Implementation Methods
217
// ---------------------------------------------------------------------
218

219     /**
220      * Open the previously cached TCP port. This method is used to open
221      * the port when the channelListPort is not received until after the
222      * tcpPort is received.
223      */

224     private void open()
225     {
226         open(_tcpIP, _tcpPort, _fromPort);
227     }
228     
229     /**
230      * Open a tcp port.
231      *
232      * @param port
233      */

234     private void open(InetAddress JavaDoc addr, int port, Port responsePort)
235     {
236         try
237         {
238             openPort(addr,port);
239             getContext().respond(responsePort, new PortManagerReady());
240             try
241             {
242                 getContext().getPort().send(getContext().createMessage(new AcceptClient()));
243             }
244             catch (PortException e1)
245             {
246                 LOG.error("Failed to send initial accept-client", e1);
247             }
248             _timeStarted = System.currentTimeMillis();
249         }
250         catch (IOException JavaDoc e)
251         {
252             String JavaDoc s = "Unable to open port: " +port;
253             getContext().respond(new PacketTransportException(s,e));
254         }
255     }
256     
257     /**
258      * Close existing server socket, if open.
259      */

260     private void closePort()
261     {
262         if (_open)
263         {
264             try
265             {
266                 if (null != _socketChannel) _socketChannel.close();
267             }
268             catch (IOException JavaDoc e)
269             {
270                 LOG.error("Unexpected error while closing server socket: ", e);
271             }
272         }
273         _open = false;
274     }
275     
276     /**
277      * Publish the server-side socket channel;
278      * @param p the port to bind the channel to.
279      * @throws IOException if an error occurs during the socket
280      * channel creation.
281      */

282     private void openPort(InetAddress JavaDoc addr, int p) throws IOException JavaDoc
283     {
284         // If already opened, close and open on new.
285
closePort();
286         _tcpPort = p;
287         _socketChannel = ServerSocketChannel.open();
288         _socketChannel.socket().setReuseAddress(true);
289         InetSocketAddress JavaDoc sa = new InetSocketAddress JavaDoc(addr,p);
290         _socketChannel.socket().bind(sa);
291         if (LOG.isInfoEnabled())
292         {
293             LOG.info("Port bound: " + Integer.toString(p));
294         }
295         _socketChannel.configureBlocking(false);
296         _selector = Selector.open();
297         _socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
298         LOG.debug("Port opened");
299         _open = true;
300     }
301     
302     /**
303      * Wait for a client to connect.
304      * @return
305      */

306     private SocketChannel JavaDoc waitForClient(long maxWait_msec) throws IOException JavaDoc
307     {
308         SocketChannel JavaDoc ch = null;
309         
310         _selector.select(maxWait_msec);
311         Set JavaDoc selectedKeys = _selector.selectedKeys();
312         for (Iterator JavaDoc it = selectedKeys.iterator(); it.hasNext(); )
313         {
314             SelectionKey JavaDoc key = (SelectionKey JavaDoc) it.next();
315             it.remove();
316             key.cancel();
317             _selector.selectNow();
318             ch = _socketChannel.accept();
319             LOG.info("Got client");
320             _socketChannel.register(_selector,SelectionKey.OP_ACCEPT);
321         }
322         if (null != ch)
323         {
324             _noClients++;
325         }
326         return ch;
327     }
328 }
329
Popular Tags