1 package org.sapia.ubik.mcast.server; 2 3 import java.io.EOFException ; 4 import java.io.IOException ; 5 import java.io.InterruptedIOException ; 6 7 import java.net.DatagramPacket ; 8 import java.net.InetAddress ; 9 import java.net.MulticastSocket ; 10 import java.net.SocketException ; 11 import org.sapia.ubik.mcast.ByteArrayPool; 12 import org.sapia.ubik.rmi.server.Log; 13 14 15 23 public abstract class MulticastServer extends Thread { 24 static final int DEFAULT_BUFSZ = 20000; 25 static final int TTL = 7; 26 protected MulticastSocket _sock; 27 private InetAddress _group; 28 private String _groupStr; 29 private int _port; 30 private ByteArrayPool _bytes = new ByteArrayPool(DEFAULT_BUFSZ); 31 32 35 public MulticastServer(String name, int soTimeout, String mcastAddress, 36 int mcastPort, int ttl) throws IOException { 37 super(name); 38 super.setDaemon(true); 39 _group = InetAddress.getByName(mcastAddress); 40 _groupStr = mcastAddress; 41 _sock = new MulticastSocket (mcastPort); 42 _sock.setSoTimeout(soTimeout); 43 _sock.setTimeToLive(ttl); 44 _sock.joinGroup(_group); 45 _port = mcastPort; 46 } 47 48 public MulticastServer(String name, int soTimeout, String mcastAddress, 49 int mcastPort) throws IOException { 50 this(name, soTimeout, mcastAddress, mcastPort, TTL); 51 } 52 53 public void setBufsize(int size) { 54 _bytes.setBufSize(size); 55 } 56 57 public String getMulticastAddress() { 58 return _groupStr; 59 } 60 61 public int getMulticastPort() { 62 return _port; 63 } 64 65 public void send(byte[] toSend) throws IOException { 66 if (_sock == null) { 67 throw new IllegalStateException ("Server not started"); 68 } 69 70 DatagramPacket pack = new DatagramPacket (toSend, toSend.length, _group, 71 _port); 72 _sock.send(pack); 73 } 74 75 public void close() { 76 try { 77 _sock.leaveGroup(_group); 78 } catch (IOException e) { 79 } 81 82 _sock.close(); 83 } 84 85 public void run() { 86 DatagramPacket pack = null; 87 88 while (true) { 89 byte[] bytes = null; 90 91 try{ 92 bytes = (byte[])_bytes.acquire(); 93 } catch (Exception e) { 94 Log.error(getClass(), "Could not acquire byte buffer"); 95 e.printStackTrace(); 96 break; 97 } 98 99 try { 100 pack = new DatagramPacket (bytes, bytes.length); 101 _sock.receive(pack); 102 handle(pack, _sock); 103 } catch (InterruptedIOException e) { 104 handleSoTimeout(); 105 } catch (EOFException e) { 106 handlePacketSizeToShort(pack); 107 } catch (SocketException e) { 108 if (_sock.isClosed()) { 109 break; 110 } 111 } catch (IOException e) { 112 e.printStackTrace(); 113 } finally { 114 _bytes.release(bytes); 115 } 116 } 117 } 118 119 protected int bufSize() { 120 return _bytes.getBufSize(); 121 } 122 123 protected abstract void handleSoTimeout(); 124 125 protected abstract void handlePacketSizeToShort(DatagramPacket pack); 126 127 protected abstract void handle(DatagramPacket pack, MulticastSocket sock); 128 } 129 | Popular Tags |