KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > transport > packet > PacketWriter


1
2 package transport.packet;
3 import java.io.IOException JavaDoc;
4 import java.net.InetAddress JavaDoc;
5 import java.net.InetSocketAddress JavaDoc;
6 import java.net.Socket JavaDoc;
7 import java.net.SocketAddress JavaDoc;
8 import java.nio.ByteBuffer JavaDoc;
9 import java.nio.channels.SocketChannel JavaDoc;
10 import java.util.Collection JavaDoc;
11 import java.util.HashMap JavaDoc;
12 import java.util.Iterator JavaDoc;
13 import java.util.List JavaDoc;
14 import java.util.Map JavaDoc;
15 import java.util.Vector JavaDoc;
16
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19
20 import transport.channel.AddChannelCommand;
21 import transport.channel.GetChannelCommand;
22 import transport.channel.GetChannelResponse;
23 import transport.channel.RemoveChannelCommand;
24
25 import jegg.EggBase;
26 import jegg.PortException;
27 import jegg.Port;
28
29
30 /**
31  * PacketWriter writes to the network any packets that it receives. The
32  * channel to write the packet is identified by the channelID associated
33  * with the packet (see Packet.getChannelID()). If the channel ID is
34  * zero, then it is assumed that a new channel should be opened to write
35  * the packet to. The remote address to connect to is assumed, in this
36  * case, to be the socket address bound to the 'to' peer in the packet
37  * header (see Peer.getAddress()). When a new channel is opened, it is
38  * sent to the ChannelList.
39  */

40 public class PacketWriter extends EggBase
41 {
42     private static final Log LOG = LogFactory.getLog(PacketWriter.class);
43     private static long nextSerial = System.currentTimeMillis();
44     
45     private Port channelListPort;
46     private Map JavaDoc channels = new HashMap JavaDoc();
47     private Map JavaDoc pending = new HashMap JavaDoc();
48     
49     // --------------------------------------------------------------------
50
// Constructors
51
// --------------------------------------------------------------------
52

53     public PacketWriter()
54     {
55         super();
56     }
57
58     public void init()
59     {
60         getContext().bindToPort(channelListPort);
61     }
62     
63     // --------------------------------------------------------------------
64
// Event Handlers
65
// --------------------------------------------------------------------
66

67     public void handle(Port p)
68     {
69         if (LOG.isDebugEnabled())
70             LOG.debug("handle("+p+")");
71            
72         channelListPort = p;
73     }
74     
75     /* (non-Javadoc)
76      * @see egg.Egg#handle(java.lang.Object)
77      */

78     public void handle(Object JavaDoc message)
79     {
80         LOG.warn("Unexpected message: " + message);
81     }
82     
83     /**
84      * Write a packet.
85      * @param p the packet to write.
86      */

87     public void handle(Packet p)
88     {
89         LOG.debug("Got packet");
90         
91         // If packet without 'channel ID' set, then interpret
92
// that to mean that a new socket connection should be opened
93
// to the packet destination.
94

95         p.setSerial(nextSerial++);
96         int ch_id = p.getChannelID();
97         SocketChannel JavaDoc ch = null;
98         
99         if (0 >= ch_id)
100         {
101             InetAddress JavaDoc ia = p.getIP();
102             int port = p.getPort();
103             
104             if (null == ia)
105             {
106                 LOG.error("Unable to send packet "
107                         + p.getSerial()
108                         + ": channel ID not set");
109             }
110             else
111             {
112                 try
113                 {
114                     SocketChannel JavaDoc newChan = SocketChannel.open();
115                     newChan.connect(new InetSocketAddress JavaDoc(ia,port));
116                     channelListPort.send(getContext().createMessage(new AddChannelCommand(newChan)));
117                     writePacket(p,newChan);
118                 }
119                 catch (IOException JavaDoc e)
120                 {
121                     LOG.error("Unable to send packet", e);
122                 } catch (PortException e)
123                 {
124                     LOG.error("Unable to send add-channel command", e);
125                 }
126             }
127         }
128         else
129         {
130             Object JavaDoc key = new Integer JavaDoc(ch_id);
131             ch = (SocketChannel JavaDoc) channels.get(key);
132         
133             if (null == ch)
134             {
135                 Collection JavaDoc list = (Collection JavaDoc) pending.get(key);
136                 if (null == list)
137                 {
138                     list = new Vector JavaDoc();
139                     pending.put(key,list);
140                 }
141                 list.add(p);
142                 try
143                 {
144                     channelListPort.send(getContext().createMessage(new GetChannelCommand(ch_id)));
145                 }
146                 catch (PortException e)
147                 {
148                     LOG.error("Unable to send get-channel command", e);
149                 }
150             }
151             else
152             {
153                 writePacket(p,ch);
154             }
155         }
156     }
157
158     public void handle(GetChannelResponse r)
159     {
160         SocketChannel JavaDoc ch = r.getChannel();
161         Integer JavaDoc key = new Integer JavaDoc(ch.hashCode());
162         channels.put(key,ch);
163         Collection JavaDoc list = (Collection JavaDoc) pending.get(key);
164         if (null != list && 0 < list.size())
165         {
166             for (Iterator JavaDoc it=list.iterator(); it.hasNext(); )
167             {
168                 handle((Packet) it.next());
169             }
170         }
171     }
172     
173     /**
174      * This is the response to a 'get-channel' request
175      * issued to the channel list for a packet that needs
176      * to be published.
177      */

178     public void handle(SocketChannel JavaDoc ch)
179     {
180         Integer JavaDoc channelID = new Integer JavaDoc(ch.hashCode());
181         List JavaDoc list = (List JavaDoc) pending.get(channelID);
182         if (null != list)
183         {
184             writePackets(list,ch);
185             pending.remove(channelID);
186         }
187         channels.put(channelID,ch);
188     }
189     
190     /**
191      * Handle 'remove-channel' event from channel list.
192      */

193     public void handle(RemoveChannelCommand rmv)
194     {
195         cleanupChannel(rmv.getID());
196     }
197     
198     /**
199      * Handle 'add-channel' from channel list. These are
200      * ignored because we only cache channels as we get packets
201      * to write.
202      */

203     public void handle(AddChannelCommand add)
204     {
205         // IGNORE
206
}
207     
208     // --------------------------------------------------------------------
209
// Implementation Support Methods
210
// --------------------------------------------------------------------
211

212     private SocketChannel JavaDoc openNewChannel(SocketAddress JavaDoc addr) throws IOException JavaDoc
213     {
214         SocketChannel JavaDoc newChannel = null;
215         newChannel = SocketChannel.open();
216         Socket JavaDoc sock = newChannel.socket();
217         sock.connect(addr);
218         return newChannel;
219     }
220
221     private void writePackets(List JavaDoc list, SocketChannel JavaDoc ch)
222     {
223         boolean dropall = false;
224         for (Iterator JavaDoc it=list.iterator(); it.hasNext(); )
225         {
226             Packet p = (Packet) it.next();
227             if (!dropall)
228             {
229                 if (!writePacket(p,ch))
230                 {
231                     dropall = true;
232                 }
233             }
234             else
235             {
236                 LOG.warn("Dropping packet "+p.getSerial()+" due to channel error");
237             }
238         }
239         list.clear();
240     }
241
242     private boolean writePacket(Packet p, SocketChannel JavaDoc ch)
243     {
244         if (LOG.isDebugEnabled())
245             LOG.debug("Writing packet #"+p.getSerial()+" to channel "+p.getChannelID());
246         try
247         {
248             byte[] data = p.toBytes();
249             ByteBuffer JavaDoc data_buf = ByteBuffer.allocate( data.length + 4 );
250             data_buf.clear();
251             data_buf.putInt( data.length );
252             data_buf.put( data );
253             data_buf.flip();
254             while( data_buf.hasRemaining() )
255             {
256                 LOG.debug("Writing data to channel");
257                 if( 0 == ch.write( data_buf ) )
258                 {
259                     throw new IOException JavaDoc(
260                         "TCP send buffer OVERFLOW on channel "+ch.hashCode());
261                 }
262             }
263         }
264         catch (IOException JavaDoc e)
265         {
266             LOG.error("Error writing packet "+p.getSerial()+" to channel "+ch.hashCode());
267             dropChannel(ch);
268             return false;
269         }
270         return true;
271     }
272     
273     private void dropChannel(SocketChannel JavaDoc ch)
274     {
275         int id = ch.hashCode();
276         if (LOG.isInfoEnabled())
277             LOG.info("Dropping channel "+id);
278         cleanupChannel(ch.hashCode());
279         try
280         {
281             channelListPort.send(getContext().createMessage(new RemoveChannelCommand(id)));
282         }
283         catch (PortException e)
284         {
285             LOG.error("Unable to send remove-channel command", e);
286         }
287     }
288     
289     private void cleanupChannel(int id)
290     {
291         Integer JavaDoc channelID = new Integer JavaDoc(id);
292         channels.remove(channelID);
293         List JavaDoc list = (List JavaDoc) pending.get(channelID);
294         if (null != list)
295         {
296             pending.remove(channelID);
297             logDroppedPackets(list);
298         }
299     }
300     
301     private void logDroppedPackets(List JavaDoc list)
302     {
303         if (LOG.isInfoEnabled())
304         {
305             for (Iterator JavaDoc it=list.iterator(); it.hasNext(); )
306             {
307                 Packet p = (Packet) it.next();
308                 LOG.info("Dropping packet: " +p);
309             }
310         }
311     }
312 }
Popular Tags