1 2 package transport.packet; 3 import java.io.IOException ; 4 import java.net.InetAddress ; 5 import java.net.InetSocketAddress ; 6 import java.net.Socket ; 7 import java.net.SocketAddress ; 8 import java.nio.ByteBuffer ; 9 import java.nio.channels.SocketChannel ; 10 import java.util.Collection ; 11 import java.util.HashMap ; 12 import java.util.Iterator ; 13 import java.util.List ; 14 import java.util.Map ; 15 import java.util.Vector ; 16 17 import org.apache.commons.logging.Log; 18 import org.apache.commons.logging.LogFactory; 19 20 import transport.channel.AddChannelCommand; 21 import transport.channel.GetChannelCommand; 22 import transport.channel.GetChannelResponse; 23 import transport.channel.RemoveChannelCommand; 24 25 import jegg.EggBase; 26 import jegg.PortException; 27 import jegg.Port; 28 29 30 40 public class PacketWriter extends EggBase 41 { 42 private static final Log LOG = LogFactory.getLog(PacketWriter.class); 43 private static long nextSerial = System.currentTimeMillis(); 44 45 private Port channelListPort; 46 private Map channels = new HashMap (); 47 private Map pending = new HashMap (); 48 49 53 public PacketWriter() 54 { 55 super(); 56 } 57 58 public void init() 59 { 60 getContext().bindToPort(channelListPort); 61 } 62 63 67 public void handle(Port p) 68 { 69 if (LOG.isDebugEnabled()) 70 LOG.debug("handle("+p+")"); 71 72 channelListPort = p; 73 } 74 75 78 public void handle(Object message) 79 { 80 LOG.warn("Unexpected message: " + message); 81 } 82 83 87 public void handle(Packet p) 88 { 89 LOG.debug("Got packet"); 90 91 95 p.setSerial(nextSerial++); 96 int ch_id = p.getChannelID(); 97 SocketChannel ch = null; 98 99 if (0 >= ch_id) 100 { 101 InetAddress ia = p.getIP(); 102 int port = p.getPort(); 103 104 if (null == ia) 105 { 106 LOG.error("Unable to send packet " 107 + p.getSerial() 108 + ": channel ID not set"); 109 } 110 else 111 { 112 try 113 { 114 SocketChannel newChan = SocketChannel.open(); 115 newChan.connect(new InetSocketAddress (ia,port)); 116 channelListPort.send(getContext().createMessage(new AddChannelCommand(newChan))); 117 writePacket(p,newChan); 118 } 119 catch (IOException e) 120 { 121 LOG.error("Unable to send packet", e); 122 } catch (PortException e) 123 { 124 LOG.error("Unable to send add-channel command", e); 125 } 126 } 127 } 128 else 129 { 130 Object key = new Integer (ch_id); 131 ch = (SocketChannel ) channels.get(key); 132 133 if (null == ch) 134 { 135 Collection list = (Collection ) pending.get(key); 136 if (null == list) 137 { 138 list = new Vector (); 139 pending.put(key,list); 140 } 141 list.add(p); 142 try 143 { 144 channelListPort.send(getContext().createMessage(new GetChannelCommand(ch_id))); 145 } 146 catch (PortException e) 147 { 148 LOG.error("Unable to send get-channel command", e); 149 } 150 } 151 else 152 { 153 writePacket(p,ch); 154 } 155 } 156 } 157 158 public void handle(GetChannelResponse r) 159 { 160 SocketChannel ch = r.getChannel(); 161 Integer key = new Integer (ch.hashCode()); 162 channels.put(key,ch); 163 Collection list = (Collection ) pending.get(key); 164 if (null != list && 0 < list.size()) 165 { 166 for (Iterator it=list.iterator(); it.hasNext(); ) 167 { 168 handle((Packet) it.next()); 169 } 170 } 171 } 172 173 178 public void handle(SocketChannel ch) 179 { 180 Integer channelID = new Integer (ch.hashCode()); 181 List list = (List ) pending.get(channelID); 182 if (null != list) 183 { 184 writePackets(list,ch); 185 pending.remove(channelID); 186 } 187 channels.put(channelID,ch); 188 } 189 190 193 public void handle(RemoveChannelCommand rmv) 194 { 195 cleanupChannel(rmv.getID()); 196 } 197 198 203 public void handle(AddChannelCommand add) 204 { 205 } 207 208 212 private SocketChannel openNewChannel(SocketAddress addr) throws IOException 213 { 214 SocketChannel newChannel = null; 215 newChannel = SocketChannel.open(); 216 Socket sock = newChannel.socket(); 217 sock.connect(addr); 218 return newChannel; 219 } 220 221 private void writePackets(List list, SocketChannel ch) 222 { 223 boolean dropall = false; 224 for (Iterator it=list.iterator(); it.hasNext(); ) 225 { 226 Packet p = (Packet) it.next(); 227 if (!dropall) 228 { 229 if (!writePacket(p,ch)) 230 { 231 dropall = true; 232 } 233 } 234 else 235 { 236 LOG.warn("Dropping packet "+p.getSerial()+" due to channel error"); 237 } 238 } 239 list.clear(); 240 } 241 242 private boolean writePacket(Packet p, SocketChannel ch) 243 { 244 if (LOG.isDebugEnabled()) 245 LOG.debug("Writing packet #"+p.getSerial()+" to channel "+p.getChannelID()); 246 try 247 { 248 byte[] data = p.toBytes(); 249 ByteBuffer data_buf = ByteBuffer.allocate( data.length + 4 ); 250 data_buf.clear(); 251 data_buf.putInt( data.length ); 252 data_buf.put( data ); 253 data_buf.flip(); 254 while( data_buf.hasRemaining() ) 255 { 256 LOG.debug("Writing data to channel"); 257 if( 0 == ch.write( data_buf ) ) 258 { 259 throw new IOException ( 260 "TCP send buffer OVERFLOW on channel "+ch.hashCode()); 261 } 262 } 263 } 264 catch (IOException e) 265 { 266 LOG.error("Error writing packet "+p.getSerial()+" to channel "+ch.hashCode()); 267 dropChannel(ch); 268 return false; 269 } 270 return true; 271 } 272 273 private void dropChannel(SocketChannel ch) 274 { 275 int id = ch.hashCode(); 276 if (LOG.isInfoEnabled()) 277 LOG.info("Dropping channel "+id); 278 cleanupChannel(ch.hashCode()); 279 try 280 { 281 channelListPort.send(getContext().createMessage(new RemoveChannelCommand(id))); 282 } 283 catch (PortException e) 284 { 285 LOG.error("Unable to send remove-channel command", e); 286 } 287 } 288 289 private void cleanupChannel(int id) 290 { 291 Integer channelID = new Integer (id); 292 channels.remove(channelID); 293 List list = (List ) pending.get(channelID); 294 if (null != list) 295 { 296 pending.remove(channelID); 297 logDroppedPackets(list); 298 } 299 } 300 301 private void logDroppedPackets(List list) 302 { 303 if (LOG.isInfoEnabled()) 304 { 305 for (Iterator it=list.iterator(); it.hasNext(); ) 306 { 307 Packet p = (Packet) it.next(); 308 LOG.info("Dropping packet: " +p); 309 } 310 } 311 } 312 } | Popular Tags |