1 22 package org.xsocket.datagram; 23 24 25 import java.io.IOException ; 26 import java.net.DatagramSocket ; 27 import java.net.InetAddress ; 28 import java.net.SocketAddress ; 29 import java.net.SocketException ; 30 import java.net.SocketOptions ; 31 import java.net.SocketTimeoutException ; 32 import java.nio.ByteBuffer ; 33 import java.util.ArrayList ; 34 import java.util.List ; 35 import java.util.Random ; 36 import java.util.concurrent.Executor ; 37 import java.util.concurrent.Executors ; 38 import java.util.logging.Level ; 39 import java.util.logging.Logger ; 40 41 import org.xsocket.DataConverter; 42 import org.xsocket.IWorkerPool; 43 44 45 46 51 abstract class AbstractEndpoint implements IEndpoint { 52 53 private static final Logger LOG = Logger.getLogger(AbstractEndpoint.class.getName()); 54 55 56 private static Executor GLOBAL_WORKERPOOL = Executors.newCachedThreadPool(); 57 58 private static String idPrefix = null; 59 60 61 private static long nextId = 0; 63 private String id = null; 64 65 66 67 private String defaultEncoding = "UTF-8"; 69 70 71 private final Object readGuard = new Object (); 73 private final ReceiveQueue receiveQueue = new ReceiveQueue(); 74 private int receiveSize = -1; 75 76 77 78 private IDatagramHandler datagramHandler = null; 80 81 82 private Executor workerPool = null; 84 85 86 private long openTime = -1; 88 private long lastTimeReceived = System.currentTimeMillis(); 89 private long receivedBytes = 0; 90 91 private long handleIncomingDatagrams = 0; 92 private long handleOutgoingDatagrams = 0; 93 94 95 static { 96 String base = null; 97 try { 98 base = InetAddress.getLocalHost().getCanonicalHostName(); 99 } catch (Exception e) { 100 base = "locale"; 101 } 102 103 int random = 0; 104 do { 105 random = new Random ().nextInt(); 106 } while (random < 0); 107 idPrefix = Integer.toHexString(base.hashCode()) + "." + Long.toHexString(System.currentTimeMillis()) + "." + Integer.toHexString(random); 108 } 109 110 111 112 113 121 AbstractEndpoint(IDatagramHandler datagramHandler, int receiveSize, Executor workerPool) { 122 this.datagramHandler = datagramHandler; 123 this.receiveSize = receiveSize; 124 this.workerPool = workerPool; 125 126 id = idPrefix + "." + (++nextId); 127 128 Runtime.getRuntime().addShutdownHook(new Thread () { 129 @Override 130 public void run() { 131 close(); 132 } 133 }); 134 135 openTime = System.currentTimeMillis(); 136 } 137 138 139 140 protected static Executor getGlobalWorkerPool() { 141 return GLOBAL_WORKERPOOL; 142 } 143 144 145 public void close() { 146 147 } 148 149 150 153 final SocketOptions getSocketOptions(final DatagramSocket socket) { 154 155 return new SocketOptions () { 156 157 public Object getOption(int optID) throws SocketException { 158 return DatagramSocketConfiguration.getOption(socket, optID); 159 } 160 161 public void setOption(int optID, Object value) throws SocketException { 162 DatagramSocketConfiguration.setOption(socket, optID, value); 163 } 164 165 @Override 166 public String toString() { 167 try { 168 return "TCP_NODELAY=" + getOption(TCP_NODELAY) + ", " 169 + "SO_TIMEOUT=" + getOption(SO_TIMEOUT) + ", " 170 + "SO_SNDBUF=" + getOption(SO_SNDBUF) + ", " 171 + "SO_REUSEADDR=" + getOption(SO_REUSEADDR) + ", " 172 + "SO_RCVBUF=" + getOption(SO_RCVBUF) + ", " 173 + "IP_TOS=" + getOption(IP_TOS) + ", "; 174 } catch (Exception e) { 175 return super.toString(); 176 } 177 } 178 }; 179 } 180 181 182 183 184 185 190 public void setWorkerPool(IWorkerPool workerPool) { 191 this.workerPool = workerPool; 192 } 193 194 195 196 202 public IWorkerPool getWorkerPool() { 203 return (IWorkerPool) workerPool; 204 } 205 206 207 212 public Executor getWorkerpool() { 213 return workerPool; 214 } 215 216 217 218 221 public final void setReceiveSize(int receivePacketSize) { 222 this.receiveSize = receivePacketSize; 223 } 224 225 228 public final int getReceiveSize() { 229 return receiveSize; 230 } 231 232 protected final void onData(SocketAddress address, ByteBuffer data) { 233 UserDatagram packet = new UserDatagram(address, data, getDefaultEncoding()); 234 receiveQueue.offer(packet); 235 236 if (LOG.isLoggable(Level.FINE)) { 237 LOG.fine("[" + "/:" + getLocalPort() + " " + getId() + "] datagram received: " + packet.toString()); 238 } 239 240 handleIncomingDatagrams++; 241 lastTimeReceived = System.currentTimeMillis(); 242 receivedBytes += data.remaining(); 243 244 245 if (datagramHandler != null) { 246 workerPool.execute(new HandlerProcessor()); 247 } 248 } 249 250 251 252 255 public final UserDatagram receive(long timeoutMillis) throws IOException , SocketTimeoutException { 256 if (getReceiveSize() == -1) { 257 throw new IOException ("the receive packet size hasn't been set"); 258 } 259 260 UserDatagram datagram = null; 261 262 if (timeoutMillis <= 0) { 264 datagram = receive(); 265 266 267 } else { 269 long start = System.currentTimeMillis(); 270 271 synchronized (readGuard) { 272 do { 273 datagram = receive(); 274 if (datagram != null) { 275 break; 276 } else { 277 try { 278 readGuard.wait(timeoutMillis / 10); 279 } catch (InterruptedException ignore) { } 280 } 281 } while (System.currentTimeMillis() < (start + timeoutMillis)); 282 } 283 } 284 285 if (datagram == null) { 286 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(timeoutMillis) + " reached"); 287 } else { 288 return datagram; 289 } 290 } 291 292 293 public UserDatagram receive() { 294 return receiveQueue.poll(); 295 } 296 297 298 301 public final String getDefaultEncoding() { 302 return defaultEncoding; 303 } 304 305 306 309 public final void setDefaultEncoding(String defaultEncoding) { 310 this.defaultEncoding = defaultEncoding; 311 } 312 313 314 315 316 319 protected final void incNumberOfHandledOutgoingDatagram() { 320 handleOutgoingDatagrams++; 321 } 322 323 328 public final String getId() { 329 return id; 330 } 331 332 333 336 @Override 337 public String toString() { 338 return " received=" + DataConverter.toFormatedBytesSize(receivedBytes) 339 + ", age=" + DataConverter.toFormatedDuration(System.currentTimeMillis() - openTime) 340 + ", lastReceived=" + DataConverter.toFormatedDate(lastTimeReceived) 341 + " [" + id + "]"; 342 } 343 344 345 346 347 private static final class ReceiveQueue { 348 private List <UserDatagram> receiveQueue = new ArrayList <UserDatagram>(); 349 private int modifyVersion = 0; 350 351 public synchronized void offer(UserDatagram userDatagram) { 352 modifyVersion++; 353 receiveQueue.add(userDatagram); 354 } 355 356 public synchronized UserDatagram poll() { 357 if (receiveQueue.isEmpty()) { 358 return null; 359 } else { 360 modifyVersion++; 361 return receiveQueue.remove(0); 362 } 363 } 364 365 public synchronized boolean isEmpty() { 366 modifyVersion++; 367 return receiveQueue.isEmpty(); 368 } 369 370 public int getModifyVersion() { 371 return modifyVersion; 372 } 373 374 @Override 375 public String toString() { 376 return receiveQueue.size() + " (modifyVersion=" + modifyVersion + ")"; 377 } 378 } 379 380 381 private final class HandlerProcessor implements Runnable { 382 383 public void run() { 384 385 try { 386 if (!receiveQueue.isEmpty()) { 387 datagramHandler.onDatagram(AbstractEndpoint.this); 388 } 389 390 391 } catch (Throwable e) { 392 if (LOG.isLoggable(Level.FINE)) { 393 LOG.fine("error occured by performing onData task. Reason: " + e.toString()); 394 } 395 } 396 } 397 } 398 } 399 | Popular Tags |