1 55 package org.lateralnz.messaging.broadcast; 56 57 import java.io.IOException ; 58 import java.io.Serializable ; 59 import java.net.DatagramPacket ; 60 import java.net.InetAddress ; 61 import java.net.DatagramSocket ; 62 import java.util.Arrays ; 63 64 import org.apache.log4j.Logger; 65 66 import org.lateralnz.common.util.Constants; 67 import org.lateralnz.common.util.SystemUtils; 68 import org.lateralnz.common.wrapper.IntHolder; 69 import org.lateralnz.messaging.AbstractMessageHandler; 70 import org.lateralnz.messaging.Message; 71 import org.lateralnz.messaging.MessageHandler; 72 import org.lateralnz.messaging.util.PacketUtils; 73 74 79 public class BroadcastMessageHandler extends AbstractMessageHandler implements Runnable , MessageHandler, Constants, Serializable { 80 private static final Logger log = Logger.getLogger(BroadcastMessageHandler.class.getName()); 81 private static final int HEADER_SIZE = 12; private static final int SPACER_SIZE = 20; 84 private boolean running = true; 85 86 private byte[] localhost2 = SystemUtils.getLocalhostIP(); 87 private byte[] localhost1 = { (byte)127, (byte)0, (byte)0, (byte)1 }; 88 protected DatagramSocket sock; private int maxbuf; private int receiveBufSize; private IntHolder msgID = new IntHolder(0); protected Thread listenerThread; 93 94 protected BroadcastMessageHandler(String ipaddress, int port) throws Exception { 95 this(ipaddress, port, new DatagramSocket (port, InetAddress.getByName(ipaddress)), false); 96 sock.setBroadcast(true); 97 98 listenerThread.start(); 99 } 100 101 protected BroadcastMessageHandler(String ipaddress, int port, DatagramSocket sock) throws Exception { 102 this(ipaddress, port, sock, true); 103 } 104 105 protected BroadcastMessageHandler(String ipaddress, int port, DatagramSocket sock, boolean start) throws Exception { 106 setAddress(InetAddress.getByName(ipaddress), port); 107 this.sock = sock; 108 109 maxbuf = sock.getSendBufferSize() - (HEADER_SIZE + SPACER_SIZE); 110 receiveBufSize = sock.getReceiveBufferSize(); 111 112 listenerThread = new Thread (this); 113 114 if (start) { 115 listenerThread.start(); 116 } 117 } 118 119 private final boolean isLocal(InetAddress addr) { 120 byte[] b = addr.getAddress(); 121 return (Arrays.equals(b, localhost1) || Arrays.equals(b, localhost2)); 122 } 123 124 127 public void run() { 128 if (log.isDebugEnabled()) { 129 log.debug("packet listener running"); 130 } 131 byte[] buf = new byte[receiveBufSize]; 132 DatagramPacket packet = new DatagramPacket (buf, buf.length); 133 long time = System.currentTimeMillis(); 134 135 loop: while (running) { 136 try { 137 sock.receive(packet); 139 140 if (log.isDebugEnabled()) { 141 log.debug("received packet"); 142 } 143 144 if (!isLocal(packet.getAddress())) { 145 Object obj = PacketUtils.reconstitute(packet, receiveBufSize); 146 if (obj != null) { 147 Message msg = (Message)obj; 148 if (log.isDebugEnabled()) { 149 log.debug("received event with group " + msg.getGroup()); 150 } 151 152 notifyListeners(msg); 153 } 154 } 155 packet.setLength(buf.length); 157 } 158 catch (Exception e) { 159 e.printStackTrace(); 160 } 161 162 PacketUtils.dumpOldData(); 163 try { 164 Thread.sleep(10); 165 Thread.yield(); 166 } 167 catch (InterruptedException ie) { } 168 } 169 } 170 171 174 public void send(Message msg) throws IOException { 175 if (!willTransmit(msg.getGroup())) { 176 return; 177 } 178 else if (log.isDebugEnabled()) { 179 log.debug("sending message " + msg.toString()); 180 } 181 182 try { 183 DatagramPacket [] packets = PacketUtils.split(msg, addr, port, maxbuf); 184 for (int i = 0; i < packets.length; i++) { 185 if (log.isDebugEnabled()) { 186 log.debug("sending packet : " + (i + 1) + " of " + packets.length); 187 } 188 sock.send(packets[i]); 189 } 190 } 191 catch (Exception e) { 192 log.error(e); 193 } 194 } 195 } | Popular Tags |