1 22 package org.xsocket.datagram; 23 24 import java.io.IOException ; 25 import java.net.DatagramPacket ; 26 import java.net.InetAddress ; 27 import java.net.InetSocketAddress ; 28 import java.net.MulticastSocket ; 29 import java.net.SocketAddress ; 30 import java.net.SocketOptions ; 31 import java.nio.ByteBuffer ; 32 import java.util.Collections ; 33 import java.util.HashMap ; 34 import java.util.Map ; 35 import java.util.concurrent.Executor ; 36 import java.util.logging.Level ; 37 import java.util.logging.Logger ; 38 39 40 import org.xsocket.ClosedConnectionException; 41 42 43 44 52 public final class MulticastEndpoint extends AbstractEndpoint implements IConnectedEndpoint { 53 54 57 58 59 private static Logger LOG = Logger.getLogger(MulticastEndpoint.class.getName()); 60 61 62 private static final Map <String ,Class > SUPPORTED_OPTIONS = new HashMap <String , Class >(); 63 64 static { 65 SUPPORTED_OPTIONS.put(SO_RCVBUF, Integer .class); 66 SUPPORTED_OPTIONS.put(SO_SNDBUF, Integer .class); 67 SUPPORTED_OPTIONS.put(IP_TOS, Integer .class); 68 SUPPORTED_OPTIONS.put(SO_REUSEADDR, Boolean .class); 69 SUPPORTED_OPTIONS.put(SO_REUSEADDR, Boolean .class); 70 SUPPORTED_OPTIONS.put(IP_MULTICAST_TTL, Integer .class); 71 SUPPORTED_OPTIONS.put(IP_MULTICAST_LOOP, Boolean .class); 72 } 73 74 75 76 private boolean isRunning = true; 78 79 80 private MulticastSocket socket = null; 82 private InetSocketAddress multicastAddress = null; 83 84 85 93 public MulticastEndpoint(InetAddress address, final int port) throws IOException { 94 this(address, port, null, 0, null); 95 } 96 97 98 101 public MulticastEndpoint(InetAddress address, final int port, DatagramSocketConfiguration socketConfiguration) throws IOException { 102 this(address, port, socketConfiguration, 0, null); 103 } 104 105 106 107 117 public MulticastEndpoint(String address, final int port, int receiveSize, IDatagramHandler datagramHandler) throws IOException { 118 this(InetAddress.getByName(address), port, new DatagramSocketConfiguration(), receiveSize, datagramHandler); 119 } 120 121 122 125 public MulticastEndpoint(String address, final int port, DatagramSocketConfiguration socketConfiguration, int receiveSize, IDatagramHandler datagramHandler) throws IOException { 126 this(InetAddress.getByName(address), port, socketConfiguration, receiveSize, datagramHandler); 127 } 128 129 130 131 141 public MulticastEndpoint(InetAddress address, final int port, int receiveSize, IDatagramHandler datagramHandler) throws IOException { 142 this(address, port, new DatagramSocketConfiguration(), receiveSize, datagramHandler); 143 } 144 145 146 157 public MulticastEndpoint(InetAddress address, final int port, int receiveSize, IDatagramHandler datagramHandler, Executor workerPool) throws IOException { 158 this(address, port, new DatagramSocketConfiguration(), receiveSize, datagramHandler, workerPool); 159 } 160 161 162 165 public MulticastEndpoint(InetAddress address, int port, DatagramSocketConfiguration socketConfiguration, int receiveSize, IDatagramHandler datagramHandler) throws IOException { 166 this(address, port, socketConfiguration, receiveSize, datagramHandler, getGlobalWorkerPool()); 167 } 168 169 170 173 public MulticastEndpoint(InetAddress address, int port, DatagramSocketConfiguration socketConfiguration, int receiveSize, IDatagramHandler datagramHandler, Executor workerPool) throws IOException { 174 this(address, port, socketConfiguration.toOptions(), receiveSize, datagramHandler, workerPool); 175 } 176 177 189 public MulticastEndpoint(InetAddress address, int port, Map <String , Object > options, int receiveSize, IDatagramHandler datagramHandler, Executor workerPool) throws IOException { 190 super(datagramHandler, receiveSize, workerPool); 191 192 socket = new MulticastSocket (port); 193 194 for (String name : options.keySet()) { 195 setOption(name, options.get(name)); 196 } 197 198 socket.joinGroup(address); 199 multicastAddress = new InetSocketAddress (address, port); 200 201 if (datagramHandler != null) { 202 startReceiver(); 203 } 204 205 if (LOG.isLoggable(Level.FINE)) { 206 LOG.fine("upd multicast endpoint bound to " + address.getCanonicalHostName() + "/" + port); 207 } 208 } 209 210 211 212 215 public SocketOptions getSocketOptions() { 216 return getSocketOptions(socket); 217 } 218 219 220 private void startReceiver() { 221 Thread receiverThread = new Thread () { 222 public void run() { 223 while (isRunning) { 224 receiveData(); 225 } 226 } 227 }; 228 229 receiverThread.setDaemon(true); 230 receiverThread.setName("MulticastReceiver#" + hashCode()); 231 receiverThread.start(); 232 } 233 234 235 236 private void receiveData() { 237 try { 238 byte[] buf = new byte[getReceiveSize()]; 239 DatagramPacket dp = new DatagramPacket (buf, buf.length); 240 socket.receive(dp); 241 242 ByteBuffer data = ByteBuffer.wrap(dp.getData()); 243 data.limit(dp.getLength()); 245 onData(new InetSocketAddress (dp.getAddress(), dp.getPort()), data); 246 247 } catch(IOException e) { 248 if (!socket.isClosed()) { 249 if (LOG.isLoggable(Level.FINE)) { 250 LOG.fine("error occured by receiving data. Reason: " + e.toString()); 251 } 252 } 253 } 254 } 255 256 257 258 261 @Override  262 public String toString() { 263 return multicastAddress.toString() + " (ID=" + getId() + ")"; 264 } 265 266 267 270 public void close() { 271 if (isRunning) { 272 isRunning = false; 273 274 try { 275 socket.leaveGroup(multicastAddress.getAddress()); 276 socket.close(); 277 } catch (Exception e) { 278 if (LOG.isLoggable(Level.FINE)) { 279 LOG.fine("error occured by closing multicast socket. Reason: " + e.toString()); 280 } 281 } 282 283 super.close(); 284 } 285 } 286 287 288 291 public SocketAddress getLocalSocketAddress() { 292 return multicastAddress; 293 } 294 295 296 299 public InetAddress getLocalAddress() { 300 return multicastAddress.getAddress(); 301 } 302 303 304 307 public int getLocalPort() { 308 return multicastAddress.getPort(); 309 } 310 311 312 315 public SocketAddress getRemoteSocketAddress() { 316 return multicastAddress; 317 } 318 319 320 323 public boolean isOpen() { 324 return !socket.isClosed(); 325 } 326 327 328 331 public void send(UserDatagram packet) throws ClosedConnectionException, IOException { 332 if (LOG.isLoggable(Level.FINER)) { 333 LOG.finer("[" + "/:" + getLocalPort() + " " + getId() + "] sending datagram " + packet.toString()); 334 } 335 336 packet.prepareForSend(); 337 338 byte[] bytes = new byte[packet.getData().remaining()]; 339 packet.getData().get(bytes); 340 DatagramPacket dataPacket = new DatagramPacket (bytes, bytes.length, multicastAddress); 341 socket.send(dataPacket); 342 343 incNumberOfHandledOutgoingDatagram(); 344 } 345 346 347 350 protected MulticastEndpoint setOption(String name, Object value) throws IOException { 351 352 if (name.equals(IEndpoint.SO_SNDBUF)) { 353 socket.setSendBufferSize((Integer ) value); 354 355 } else if (name.equals(IEndpoint.SO_REUSEADDR)) { 356 socket.setReuseAddress((Boolean ) value); 357 358 } else if (name.equals(IEndpoint.SO_RCVBUF)) { 359 socket.setReceiveBufferSize((Integer ) value); 360 361 } else if (name.equals(IEndpoint.IP_TOS)) { 362 socket.setTrafficClass((Integer ) value); 363 364 } else if (name.equals(IEndpoint.IP_MULTICAST_TTL)) { 365 socket.setTimeToLive((Integer ) value); 366 367 } else if (name.equals(IEndpoint.IP_MULTICAST_LOOP)) { 368 socket.setLoopbackMode((Boolean ) value); 369 370 } else { 371 LOG.warning("option " + name + " is not supproted for " + this.getClass().getName()); 372 } 373 374 return this; 375 } 376 377 380 public Object getOption(String name) throws IOException { 381 382 if (name.equals(IEndpoint.SO_SNDBUF)) { 383 return socket.getSendBufferSize(); 384 385 } else if (name.equals(IEndpoint.SO_REUSEADDR)) { 386 return socket.getReuseAddress(); 387 388 } else if (name.equals(IEndpoint.SO_RCVBUF)) { 389 return socket.getReceiveBufferSize(); 390 391 } else if (name.equals(IEndpoint.IP_TOS)) { 392 return socket.getTrafficClass(); 393 394 } else if (name.equals(IEndpoint.IP_MULTICAST_TTL)) { 395 return socket.getTimeToLive(); 396 397 } else if (name.equals(IEndpoint.IP_MULTICAST_LOOP)) { 398 return socket.getLoopbackMode(); 399 400 } else { 401 LOG.warning("option " + name + " is not supproted for " + this.getClass().getName()); 402 return null; 403 } 404 } 405 406 public Map <String , Class > getOptions() { 407 return Collections.unmodifiableMap(SUPPORTED_OPTIONS); 408 } 409 } 410 | Popular Tags |