1 package org.sapia.ubik.mcast; 2 3 import org.sapia.ubik.mcast.server.MulticastServer; 4 5 import java.io.*; 6 7 import java.net.DatagramPacket ; 8 import java.net.MulticastSocket ; 9 import org.sapia.ubik.rmi.server.Log; 10 11 12 22 public class BroadcastDispatcherImpl 23 implements BroadcastDispatcher { 24 static final int DEFAULT_BUFSZ = 5000; 25 static final int TTL = 7; 26 private boolean _started; 27 private String _node; 28 private String _domain; 29 private BroadcastServer _server; 30 private EventConsumer _consumer; 31 private int _bufsz = DEFAULT_BUFSZ; 32 33 public BroadcastDispatcherImpl(EventConsumer cons, String mcastHost, 34 int mcastPort) throws IOException { 35 _server = new BroadcastServer(cons, 15, mcastHost, mcastPort, TTL); 36 _server.setBufsize(_bufsz); 37 _consumer = cons; 38 _node = cons.getNode(); 39 _domain = cons.getDomainName().toString(); 40 } 41 42 public BroadcastDispatcherImpl(String node, String domain, String mcastHost, 43 int mcastPort, int ttl) throws IOException { 44 this(new EventConsumer(node, domain), mcastHost, mcastPort); 45 } 46 47 public BroadcastDispatcherImpl(String domain, String mcastHost, int mcastPort) 48 throws IOException { 49 this(new EventConsumer(domain), mcastHost, mcastPort); 50 } 51 52 62 public void setBufsize(int size) { 63 _bufsz = size; 64 _server.setBufsize(size); 65 } 66 67 72 public String getNode() { 73 return _node; 74 } 75 76 79 public synchronized void close() { 80 if (_server != null) { 81 _server.close(); 82 _server = null; 83 } 84 } 85 86 89 public void dispatch(boolean alldomains, String evtType, Object data) 90 throws IOException { 91 RemoteEvent evt; 92 93 if (alldomains) { 94 evt = new RemoteEvent(null, evtType, data).setNode(_node); 95 } else { 96 evt = new RemoteEvent(_domain, evtType, data).setNode(_node); 97 } 98 99 _server.send(Util.toBytes(evt, _bufsz)); 100 } 101 102 105 public void dispatch(String domain, String evtType, Object data) 106 throws IOException { 107 RemoteEvent evt; 108 109 if(Log.isDebug()){ 110 Log.debug(getClass(), "Sending event bytes for: " + evtType); 111 } 112 evt = new RemoteEvent(domain, evtType, data).setNode(_node); 113 _server.send(Util.toBytes(evt, _bufsz)); 114 } 115 116 122 public synchronized void registerAsyncListener(String evtType, 123 AsyncEventListener listener) { 124 _consumer.registerAsyncListener(evtType, listener); 125 } 126 127 132 public synchronized void unregisterListener(AsyncEventListener listener) { 133 _consumer.unregisterListener(listener); 134 } 135 136 139 public void start() { 140 _server.setDaemon(true); 141 _server.start(); 142 } 143 144 147 public String getMulticastAddress() { 148 return _server.getMulticastAddress(); 149 } 150 151 154 public int getMulticastPort() { 155 return _server.getMulticastPort(); 156 } 157 158 161 162 171 static class BroadcastServer extends MulticastServer { 172 EventConsumer _consumer; 173 174 public BroadcastServer(EventConsumer consumer, int soTimeout, 175 String mcastAddress, int mcastPort, int ttl) throws IOException { 176 super("ubik.mcast.BroadcastServer", soTimeout, mcastAddress, mcastPort, 177 ttl); 178 _consumer = consumer; 179 } 180 181 protected void handle(DatagramPacket pack, MulticastSocket sock) { 182 try { 183 _consumer.onAsyncEvent((RemoteEvent) Util.fromDatagram(pack)); 184 } catch (Exception e) { 185 Log.error(getClass(), "Could not deserialize remote event", e); 186 } 187 } 188 189 protected void handlePacketSizeToShort(DatagramPacket pack) { 190 System.err.println("Packet size to short: " + pack.getLength() + 191 " - increase buffer size to correct."); 192 } 193 194 protected void handleSoTimeout() { 195 } 196 } 197 } 198 | Popular Tags |