1 package transport.packet; 2 3 import java.util.Collection ; 4 import java.util.HashMap ; 5 import java.util.HashSet ; 6 import java.util.Iterator ; 7 import java.util.Map ; 8 import org.apache.commons.logging.Log; 9 import org.apache.commons.logging.LogFactory; 10 11 import transport.AddPacketListenerCommand; 12 import transport.RemovePacketListenerCommand; 13 14 import jegg.EggBase; 15 import jegg.Message; 16 import jegg.PortException; 17 import jegg.Port; 18 19 25 public class PacketPublisher extends EggBase 26 { 27 28 private static final Log LOG = LogFactory.getLog(PacketPublisher.class); 29 30 private static final int LISTENER_LIST_INITIAL_CAPACITY = 5; 31 32 private static final int LISTENER_MAP_INITIAL_CAPACITY = 10; 33 34 private Port packetReaderPort; 35 36 private Map listeners = new HashMap (LISTENER_MAP_INITIAL_CAPACITY); 37 38 39 public PacketPublisher() 40 { 41 super(); 42 } 43 44 public void init() 45 { 46 getContext().bindToPort(packetReaderPort); 47 } 48 49 53 public void handle(Port p) 54 { 55 if (LOG.isDebugEnabled()) 56 LOG.debug("handle("+p+")"); 57 58 packetReaderPort = p; 59 } 68 69 72 public void handle(Object message) 73 { 74 LOG.warn("Unexpected message: "+ message); 75 } 76 77 81 public void handle(AddPacketListenerCommand al) 82 { 83 LOG.debug("Received new packet listener"); 84 85 Object type = al.getType(); 86 if (null == type) type = PacketTypeEnum.WILDCARD; 87 Object port = al.getPort(); 88 Collection list = (Collection ) listeners.get(type); 89 if (null == list) 90 { 91 list = new HashSet (LISTENER_LIST_INITIAL_CAPACITY); 92 listeners.put(type,list); 93 } 94 list.add(port); 95 } 96 97 101 public void handle(RemovePacketListenerCommand rl) 102 { 103 Object type = rl.getType(); 104 Object port = rl.getPort(); 105 Collection list = (Collection ) listeners.get(type); 106 107 if (null != list) 108 { 109 list.remove(port); 110 } 111 } 112 113 117 public void handle(Packet p) 118 { 119 LOG.debug("Publishing packet to listeners"); 120 121 Object type = p.getType(); 122 publishToTypeListeners(p, type); 123 publishToTypeListeners(p, PacketTypeEnum.WILDCARD); 124 } 125 126 public void handle(AddPacketSource s) 127 { 128 getContext().bindToPort(s.getSource()); 129 } 130 131 135 private void publishToTypeListeners(Packet p, Object type) 136 { 137 Collection list = (Collection ) listeners.get(type); 138 if (null != list) 139 { 140 Message msg = getContext().createMessage(p); 141 142 for (Iterator it = list.iterator(); it.hasNext(); ) 143 { 144 LOG.debug("Delivering packet to listener"); 145 Port port = (Port) it.next(); 146 try 147 { 148 port.send(msg); 149 } 150 catch (PortException e) 151 { 152 LOG.error("Failed to send packet to type listeners", e); 153 } 154 } 155 } 156 } 157 158 } 159 | Popular Tags |