1 22 package org.xsocket.datagram; 23 24 import java.io.IOException ; 25 import java.net.DatagramSocket ; 26 import java.net.InetAddress ; 27 import java.net.InetSocketAddress ; 28 import java.net.SocketAddress ; 29 import java.net.SocketOptions ; 30 import java.nio.ByteBuffer ; 31 import java.nio.ByteOrder ; 32 import java.nio.channels.DatagramChannel ; 33 import java.nio.channels.SelectableChannel ; 34 import java.nio.channels.SelectionKey ; 35 import java.util.Collections ; 36 import java.util.HashMap ; 37 import java.util.LinkedList ; 38 import java.util.List ; 39 import java.util.Map ; 40 import java.util.Map.Entry; 41 import java.util.concurrent.Executor ; 42 import java.util.logging.Level ; 43 import java.util.logging.Logger ; 44 45 import org.xsocket.Dispatcher; 46 import org.xsocket.IEventHandler; 47 import org.xsocket.IHandle; 48 49 50 51 52 53 54 59 abstract class AbstractChannelBasedEndpoint extends AbstractEndpoint implements IEndpoint { 60 61 private static final Logger LOG = Logger.getLogger(AbstractChannelBasedEndpoint.class.getName()); 62 63 private static final MemoryManager memoryManager = new MemoryManager(65536, false); 64 private static Dispatcher<DispatcherHandle> dispatcher = createDispatcher(); 65 66 private static final Map <String ,Class > SUPPORTED_OPTIONS = new HashMap <String , Class >(); 67 68 static { 69 SUPPORTED_OPTIONS.put(SO_RCVBUF, Integer .class); 70 SUPPORTED_OPTIONS.put(SO_SNDBUF, Integer .class); 71 SUPPORTED_OPTIONS.put(IP_TOS, Integer .class); 72 SUPPORTED_OPTIONS.put(SO_REUSEADDR, Boolean .class); 73 } 74 75 76 77 private DatagramSocket socket = null; 79 private DatagramChannel channel = null; 80 private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; 81 82 83 private final List <UserDatagram> sendQueue = Collections.synchronizedList(new LinkedList <UserDatagram>()); 85 86 87 private DispatcherHandle dispatcherHandle = null; 88 89 90 91 102 AbstractChannelBasedEndpoint(InetAddress address, int port, Map <String , Object > options, IDatagramHandler datagramHandler, int receivePacketSize, Executor workerPool) throws IOException { 103 super(datagramHandler, receivePacketSize, workerPool); 104 105 channel = DatagramChannel.open(); 106 channel.configureBlocking(false); 107 108 socket = channel.socket(); 109 110 for (Entry<String , Object > entry : options.entrySet()) { 111 setOption(entry.getKey(), entry.getValue()); 112 } 113 114 InetSocketAddress addr = new InetSocketAddress (address, port); 115 socket.bind(addr); 116 117 dispatcherHandle = new DispatcherHandle(this); 118 dispatcher.register(dispatcherHandle, SelectionKey.OP_READ); 119 120 logFine("enpoint has been bound to locale port " + getLocalPort() + " (server mode)"); 121 } 122 123 124 127 final SocketOptions getSocketOptions() { 128 return getSocketOptions(channel.socket()); 129 } 130 131 132 @SuppressWarnings ("unchecked") 133 private static Dispatcher<DispatcherHandle> createDispatcher() { 134 Dispatcher<DispatcherHandle> disp = new Dispatcher<DispatcherHandle>(new DispatcherEventHandler()); 135 Thread t = new Thread (disp); 136 t.setName("DispatcherThread#" + disp.hashCode()); 137 t.setDaemon(true); 138 t.start(); 139 140 return disp; 141 } 142 143 144 protected final DatagramChannel getChannel() { 145 return channel; 146 } 147 148 149 150 151 154 public final void close() { 155 if (isOpen()) { 156 try { 157 logFine("closing " + toCompactString()); 158 channel.close(); 159 } catch (IOException ioe) { 160 logFine("error occured by closing connection. Reason " + ioe.toString()); 161 } 162 163 super.close(); 164 } 165 } 166 167 168 171 public final SocketAddress getLocalSocketAddress() { 172 return socket.getLocalSocketAddress(); 173 } 174 175 176 179 public final InetAddress getLocalAddress() { 180 return socket.getLocalAddress(); 181 } 182 183 184 187 public final int getLocalPort() { 188 return socket.getLocalPort(); 189 } 190 191 192 195 public final boolean isOpen() { 196 return channel.isOpen(); 197 } 198 199 200 201 206 private void logFine(String msg) { 207 if (LOG.isLoggable(Level.FINE)) { 208 LOG.fine("[" + "/:" + getLocalPort() + " " + getId() + "] " + msg); 209 } 210 } 211 212 213 216 public void send(UserDatagram packet) throws IOException { 217 if (packet.getRemoteAddress() == null) { 218 throw new IOException ("remote socket adress has to be set"); 219 } 220 221 logFine("add datagram packet (" + packet + ") to write queue"); 222 223 packet.prepareForSend(); 224 225 sendQueue.add(packet); 226 logFine("update interest ops to write"); 227 updateInteresSet(SelectionKey.OP_WRITE); 228 } 229 230 231 232 236 private void writePhysical() { 237 if (!sendQueue.isEmpty()) { 238 synchronized(sendQueue) { 239 for (UserDatagram packet : sendQueue) { 240 try { 241 if (LOG.isLoggable(Level.FINE)) { 242 LOG.fine("[" + "/:" + getLocalPort() + " " + getId() + "] sending datagram " + packet.toString()); 243 } 244 245 int dataToSend = packet.getSize(); 246 int written = channel.send(packet.getData(), packet.getRemoteSocketAddress()); 247 248 if (LOG.isLoggable(Level.FINE)) { 249 if (dataToSend != written) { 250 LOG.fine("Error occured by sending datagram. Size DataToSend=" + dataToSend + ", written=" + written); 251 } 252 } 253 254 incNumberOfHandledOutgoingDatagram(); 255 } catch (IOException ioe) { 256 LOG.warning("couldn't write datagram to " + packet.getRemoteAddress() + " .Reason: " + ioe.toString()); 257 } 258 } 259 260 sendQueue.clear(); 261 } 262 } 263 } 264 265 266 private void updateInteresSet(int intOps) throws IOException { 267 dispatcher.updateInterestSet(dispatcherHandle, intOps); 268 } 269 270 271 274 public String toCompactString() { 275 return this.getClass().getSimpleName() + " " + socket.getLocalAddress().getCanonicalHostName() + ":" + getLocalPort(); 276 } 277 278 279 280 private final static class DispatcherHandle implements IHandle { 281 282 private AbstractChannelBasedEndpoint endpoint = null; 283 284 DispatcherHandle(AbstractChannelBasedEndpoint endpoint) { 285 this.endpoint = endpoint; 286 } 287 288 public SelectableChannel getChannel() { 289 return endpoint.channel; 290 } 291 } 292 293 294 295 296 private final static class DispatcherEventHandler<T extends IEndpoint> implements IEventHandler<DispatcherHandle> { 297 298 299 302 public void onHandleRegisterEvent(DispatcherHandle handle) throws IOException { 303 304 } 305 306 307 310 @SuppressWarnings ("unchecked") 311 public void onHandleReadableEvent(final DispatcherHandle handle) { 312 if (handle.endpoint.isOpen()) { 313 314 try { 315 if (handle.endpoint.getReceiveSize() > 0) { 317 ByteBuffer readBuffer = memoryManager.acquireMemory(handle.endpoint.getReceiveSize()); 318 readBuffer.order(handle.endpoint.byteOrder); 319 SocketAddress address = handle.endpoint.channel.receive(readBuffer); 320 321 if (address == null) { 323 return; 324 325 } else { 327 328 if (readBuffer.position() == 0) { 330 return; 331 } 332 333 readBuffer.flip(); 334 handle.endpoint.onData(address, readBuffer); 335 } 336 } 337 } catch (IOException ioe) { 338 handle.endpoint.logFine("error occured while receiving. Reason: " + ioe.toString()); 339 } 340 } 341 } 342 343 344 347 public void onHandleWriteableEvent(DispatcherHandle handle) throws IOException { 348 handle.endpoint.writePhysical(); 349 handle.endpoint.updateInteresSet(SelectionKey.OP_READ); 350 } 351 352 353 356 public void onDispatcherCloseEvent(final DispatcherHandle handle) { 357 handle.endpoint.close(); 358 } 359 } 360 361 362 363 366 protected AbstractChannelBasedEndpoint setOption(String name, Object value) throws IOException { 367 368 if (name.equals(IEndpoint.SO_SNDBUF)) { 369 socket.setSendBufferSize((Integer ) value); 370 371 } else if (name.equals(IEndpoint.SO_REUSEADDR)) { 372 socket.setReuseAddress((Boolean ) value); 373 374 } else if (name.equals(IEndpoint.SO_RCVBUF)) { 375 socket.setReceiveBufferSize((Integer ) value); 376 377 } else if (name.equals(IEndpoint.IP_TOS)) { 378 socket.setTrafficClass((Integer ) value); 379 380 } else { 381 LOG.warning("option " + name + " is not supproted for " + this.getClass().getName()); 382 } 383 384 return this; 385 } 386 387 388 391 public Object getOption(String name) throws IOException { 392 393 if (name.equals(IEndpoint.SO_SNDBUF)) { 394 return socket.getSendBufferSize(); 395 396 } else if (name.equals(IEndpoint.SO_REUSEADDR)) { 397 return socket.getReuseAddress(); 398 399 } else if (name.equals(IEndpoint.SO_RCVBUF)) { 400 return socket.getReceiveBufferSize(); 401 402 } else if (name.equals(IEndpoint.IP_TOS)) { 403 return socket.getTrafficClass(); 404 405 } else { 406 LOG.warning("option " + name + " is not supproted for " + this.getClass().getName()); 407 return null; 408 } 409 } 410 411 public Map <String , Class > getOptions() { 412 return Collections.unmodifiableMap(SUPPORTED_OPTIONS); 413 } 414 } 415 | Popular Tags |