1 2 package transport.packet; 3 4 import java.io.IOException ; 5 import java.nio.channels.ClosedChannelException ; 6 import java.nio.channels.SelectionKey ; 7 import java.nio.channels.Selector ; 8 import java.nio.channels.SocketChannel ; 9 import java.util.Iterator ; 10 import java.util.List ; 11 import java.util.Set ; 12 import java.util.Vector ; 13 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 17 import transport.channel.AddChannelCommand; 18 import transport.channel.ChannelUtils; 19 import transport.channel.RemoveChannelCommand; 20 21 import jegg.EggBase; 22 import jegg.PortException; 23 import jegg.UnableToInitializeException; 24 import jegg.Port; 25 26 29 public class PacketReader extends EggBase 30 { 31 private static final Log LOG = LogFactory.getLog(PacketReader.class); 32 private static final ReadPacketsInternalCommand READ_PACKETS = new ReadPacketsInternalCommand(); 33 private static final long MAX_BLOCK_MSEC = 500; 34 35 private List readyChannels = new Vector (); 36 private List packetList = new Vector (); 37 private Selector selector; 38 39 private Port channelListPort; 40 41 45 public PacketReader() 46 { 47 super(); 48 } 49 50 public void init() throws UnableToInitializeException 51 { 52 try 53 { 54 selector = Selector.open(); 55 } 56 catch (IOException e) 57 { 58 throw new UnableToInitializeException("PacketReader", e); 59 } 60 try 61 { 62 LOG.debug("Starting to read packets"); 63 getContext().getPort().send(new ReadPacketsInternalCommand()); 64 } 65 catch (PortException e1) 66 { 67 LOG.error("Unable to initiate packet reading", e1); 68 throw new UnableToInitializeException(e1); 69 } 70 } 71 75 public void handle(Port p) 76 { 77 if (LOG.isDebugEnabled()) 78 LOG.debug("handle("+p+")"); 79 80 channelListPort = p; 81 getContext().bindToPort(channelListPort); 82 } 83 84 87 public void handle(Object message) 88 { 89 LOG.warn("Unexpected message: " + message); 90 } 91 92 96 public void handle(AddChannelCommand add) 97 { 98 LOG.info("Got new channel"); 100 101 SocketChannel ch = add.getChannel(); 102 int before = selector.keys().size(); 103 try 104 { 105 ch.configureBlocking(false); 106 SelectionKey key = ch.register(selector,SelectionKey.OP_READ); 107 key.attach(ch); 108 } 109 catch (ClosedChannelException e) 110 { 111 getContext().respond(e); 112 } 113 catch (IOException e) 114 { 115 getContext().respond(e); 116 } 117 int after = selector.keys().size(); 118 119 if (0 == before && 0 < after) 120 { 121 try 122 { 123 getPort().send(getContext().createMessage(READ_PACKETS)); 124 } 125 catch (PortException e1) 126 { 127 LOG.error("Failed to send READ_PACKETS message", e1); 128 } 129 } 130 } 131 132 136 public void handle(RemoveChannelCommand rmv) 137 { 138 int id = rmv.getID(); 139 Set keys = selector.keys(); 140 for (Iterator it=keys.iterator(); it.hasNext(); ) 141 { 142 SelectionKey key = (SelectionKey ) it.next(); 143 SocketChannel ch = (SocketChannel ) key.attachment(); 144 if (ch.hashCode() == id) 145 { 146 key.cancel(); 147 it.remove(); 148 break; 149 } 150 } 151 } 152 153 158 public void handle(ReadPacketsInternalCommand rp) 159 { 160 LOG.debug("handle(ReadPacketsInternalCommand"); 161 162 if (0 == selector.keys().size()) 163 { 164 LOG.debug("No connected clients"); 165 return; 166 } 167 168 try 169 { 170 int n = selector.select(MAX_BLOCK_MSEC); 171 if (0 < n) 172 { 173 readPackets(); 174 } 175 } 176 catch (IOException e) 177 { 178 LOG.error("Error in 'select'", e); 179 } 180 181 if ( 0 < selector.keys().size()) 182 { 183 try 184 { 185 getPort().send(getContext().getCurrentMessage()); 186 } 187 catch (PortException e1) 188 { 189 LOG.error("Failed to send read-packets internal command",e1); 190 } 191 } 192 } 193 194 198 201 private void readPackets() 202 { 203 getReadyChannels(); 204 readReadyChannels(); 205 publishPackets(); 206 } 207 208 211 private void getReadyChannels() 212 { 213 readyChannels.clear(); 214 Set ready = selector.selectedKeys(); 215 if (0 < ready.size()) 216 { 217 for (Iterator it=ready.iterator(); it.hasNext(); ) 218 { 219 SelectionKey key = (SelectionKey ) it.next(); 220 readyChannels.add(key.attachment()); 221 key.cancel(); 222 it.remove(); 223 } 224 try 225 { 226 selector.selectNow(); } 228 catch (IOException t) 229 { 230 } 232 } 233 } 234 235 238 private void readReadyChannels() 239 { 240 packetList.clear(); 241 for (Iterator it = readyChannels.iterator(); it.hasNext(); ) 242 { 243 SocketChannel ch = (SocketChannel ) it.next(); 244 Packet p = null; 245 try 246 { 247 LOG.debug("Reading channel"); 248 p = ChannelUtils.readChannel(ch); 249 } 250 catch (IOException e) 251 { 252 LOG.error("Failed to read channel - dropping: ", e); 253 dropChannel(ch); 254 continue; 255 } 256 257 if (null != p) 258 { 259 packetList.add(p); 260 } 261 262 try 263 { 264 ch.configureBlocking(false); 265 SelectionKey key = ch.register(selector, SelectionKey.OP_READ); 266 key.attach(ch); 267 } 268 catch (ClosedChannelException e) 269 { 270 LOG.debug("Channel is closed - dropping: " ); 271 dropChannel(ch); 272 } 273 catch (IOException e) 274 { 275 LOG.debug("Channel has error - dropping: " ); 276 dropChannel(ch); 277 } 278 } 279 } 280 281 285 private void dropChannel(SocketChannel ch) 286 { 287 try 288 { 289 channelListPort.send(getContext().createMessage(new RemoveChannelCommand(ch.hashCode()))); 290 } 291 catch (PortException e) 292 { 293 LOG.error("Failed to send remove-channel command to channel list", e); 294 } 295 } 296 297 private void publishPackets() 298 { 299 if (LOG.isDebugEnabled()) 300 LOG.debug("Publishing "+packetList.size()+" packets"); 301 if (0 < packetList.size()) 302 { 303 for (Iterator it=packetList.iterator(); it.hasNext(); ) 304 { 305 Packet p = (Packet) it.next(); 306 getContext().send(p); 308 } 309 } 310 } 311 } 312
| Popular Tags
|