KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > transport > packet > PacketReader


1
2 package transport.packet;
3
4 import java.io.IOException JavaDoc;
5 import java.nio.channels.ClosedChannelException JavaDoc;
6 import java.nio.channels.SelectionKey JavaDoc;
7 import java.nio.channels.Selector JavaDoc;
8 import java.nio.channels.SocketChannel JavaDoc;
9 import java.util.Iterator JavaDoc;
10 import java.util.List JavaDoc;
11 import java.util.Set JavaDoc;
12 import java.util.Vector JavaDoc;
13
14 import org.apache.commons.logging.Log;
15 import org.apache.commons.logging.LogFactory;
16
17 import transport.channel.AddChannelCommand;
18 import transport.channel.ChannelUtils;
19 import transport.channel.RemoveChannelCommand;
20
21 import jegg.EggBase;
22 import jegg.PortException;
23 import jegg.UnableToInitializeException;
24 import jegg.Port;
25
26 /**
27  *
28  */

29 public class PacketReader extends EggBase
30 {
31     private static final Log LOG = LogFactory.getLog(PacketReader.class);
32     private static final ReadPacketsInternalCommand READ_PACKETS = new ReadPacketsInternalCommand();
33     private static final long MAX_BLOCK_MSEC = 500;
34     
35     private List JavaDoc readyChannels = new Vector JavaDoc();
36     private List JavaDoc packetList = new Vector JavaDoc();
37     private Selector JavaDoc selector;
38     
39     private Port channelListPort;
40     
41     // --------------------------------------------------------------------
42
// Constructors
43
// --------------------------------------------------------------------
44

45     public PacketReader()
46     {
47         super();
48     }
49     
50     public void init() throws UnableToInitializeException
51     {
52         try
53         {
54             selector = Selector.open();
55         }
56         catch (IOException JavaDoc e)
57         {
58             throw new UnableToInitializeException("PacketReader", e);
59         }
60         try
61         {
62             LOG.debug("Starting to read packets");
63             getContext().getPort().send(new ReadPacketsInternalCommand());
64         }
65         catch (PortException e1)
66         {
67             LOG.error("Unable to initiate packet reading", e1);
68             throw new UnableToInitializeException(e1);
69         }
70     }
71     // --------------------------------------------------------------------
72
// Event Handlers
73
// --------------------------------------------------------------------
74

75     public void handle(Port p)
76     {
77         if (LOG.isDebugEnabled())
78             LOG.debug("handle("+p+")");
79         
80         channelListPort = p;
81         getContext().bindToPort(channelListPort);
82     }
83     
84     /* (non-Javadoc)
85      * @see egg.Egg#handle(java.lang.Object)
86      */

87     public void handle(Object JavaDoc message)
88     {
89         LOG.warn("Unexpected message: " + message);
90     }
91     
92     /**
93      * Accept new socket channel to read packets from.
94      * @param ch new channel.
95      */

96     public void handle(AddChannelCommand add)
97     {
98         // TODO: handle case of got channel before got ChannelList port
99
LOG.info("Got new channel");
100         
101         SocketChannel JavaDoc ch = add.getChannel();
102         int before = selector.keys().size();
103         try
104         {
105             ch.configureBlocking(false);
106             SelectionKey JavaDoc key = ch.register(selector,SelectionKey.OP_READ);
107             key.attach(ch);
108         }
109         catch (ClosedChannelException JavaDoc e)
110         {
111             getContext().respond(e);
112         }
113         catch (IOException JavaDoc e)
114         {
115             getContext().respond(e);
116         }
117         int after = selector.keys().size();
118         
119         if (0 == before && 0 < after)
120         {
121             try
122             {
123                 getPort().send(getContext().createMessage(READ_PACKETS));
124             }
125             catch (PortException e1)
126             {
127                 LOG.error("Failed to send READ_PACKETS message", e1);
128             }
129         }
130     }
131     
132     /**
133      * Handle 'remove channel' event from channel list.
134      * @param rmv event holding ID of channel to remove.
135      */

136     public void handle(RemoveChannelCommand rmv)
137     {
138         int id = rmv.getID();
139         Set JavaDoc keys = selector.keys();
140         for (Iterator JavaDoc it=keys.iterator(); it.hasNext(); )
141         {
142             SelectionKey JavaDoc key = (SelectionKey JavaDoc) it.next();
143             SocketChannel JavaDoc ch = (SocketChannel JavaDoc) key.attachment();
144             if (ch.hashCode() == id)
145             {
146                 key.cancel();
147                 it.remove();
148                 break;
149             }
150         }
151     }
152     
153     /**
154      * ReadPackets is used to trigger a check of the
155      * channels for packets.
156      * @param rp event used to trigger packet read.
157      */

158     public void handle(ReadPacketsInternalCommand rp)
159     {
160         LOG.debug("handle(ReadPacketsInternalCommand");
161         
162         if (0 == selector.keys().size())
163         {
164             LOG.debug("No connected clients");
165             return;
166         }
167         
168         try
169         {
170             int n = selector.select(MAX_BLOCK_MSEC);
171             if (0 < n)
172             {
173                 readPackets();
174             }
175         }
176         catch (IOException JavaDoc e)
177         {
178             LOG.error("Error in 'select'", e);
179         }
180         
181         if ( 0 < selector.keys().size())
182         {
183             try
184             {
185                 getPort().send(getContext().getCurrentMessage());
186             }
187             catch (PortException e1)
188             {
189                 LOG.error("Failed to send read-packets internal command",e1);
190             }
191         }
192     }
193     
194     // --------------------------------------------------------------------
195
// Implementation Support Methods
196
// --------------------------------------------------------------------
197

198     /**
199      * Read packets from channels with data on them.
200      */

201     private void readPackets()
202     {
203         getReadyChannels();
204         readReadyChannels();
205         publishPackets();
206     }
207     
208     /**
209      * Make list of channels that are ready to be read.
210      */

211     private void getReadyChannels()
212     {
213         readyChannels.clear();
214         Set JavaDoc ready = selector.selectedKeys();
215         if (0 < ready.size())
216         {
217             for (Iterator JavaDoc it=ready.iterator(); it.hasNext(); )
218             {
219                 SelectionKey JavaDoc key = (SelectionKey JavaDoc) it.next();
220                 readyChannels.add(key.attachment());
221                 key.cancel();
222                 it.remove();
223             }
224             try
225             {
226                 selector.selectNow(); // to clear cancelled keys
227
}
228             catch (IOException JavaDoc t)
229             {
230                 // ignore
231
}
232         }
233     }
234
235     /**
236      * Read the channels that are ready to be read.
237      */

238     private void readReadyChannels()
239     {
240         packetList.clear();
241         for (Iterator JavaDoc it = readyChannels.iterator(); it.hasNext(); )
242         {
243             SocketChannel JavaDoc ch = (SocketChannel JavaDoc) it.next();
244             Packet p = null;
245             try
246             {
247                 LOG.debug("Reading channel");
248                 p = ChannelUtils.readChannel(ch);
249             }
250             catch (IOException JavaDoc e)
251             {
252                 LOG.error("Failed to read channel - dropping: ", e);
253                 dropChannel(ch);
254                 continue;
255             }
256             
257             if (null != p)
258             {
259                 packetList.add(p);
260             }
261             
262             try
263             {
264                 ch.configureBlocking(false);
265                 SelectionKey JavaDoc key = ch.register(selector, SelectionKey.OP_READ);
266                 key.attach(ch);
267             }
268             catch (ClosedChannelException JavaDoc e)
269             {
270                 LOG.debug("Channel is closed - dropping: " );
271                 dropChannel(ch);
272             }
273             catch (IOException JavaDoc e)
274             {
275                 LOG.debug("Channel has error - dropping: " );
276                 dropChannel(ch);
277             }
278         }
279     }
280     
281     /**
282      * Drop a channel. Sends a message to the channel list.
283      * @param ch the channel to drop.
284      */

285     private void dropChannel(SocketChannel JavaDoc ch)
286     {
287         try
288         {
289             channelListPort.send(getContext().createMessage(new RemoveChannelCommand(ch.hashCode())));
290         }
291         catch (PortException e)
292         {
293             LOG.error("Failed to send remove-channel command to channel list", e);
294         }
295     }
296         
297     private void publishPackets()
298     {
299         if (LOG.isDebugEnabled())
300             LOG.debug("Publishing "+packetList.size()+" packets");
301         if (0 < packetList.size())
302         {
303             for (Iterator JavaDoc it=packetList.iterator(); it.hasNext(); )
304             {
305                 Packet p = (Packet) it.next();
306                 // TODO: broadcast
307
getContext().send(p);
308             }
309         }
310     }
311 }
312
Popular Tags