1 22 package org.xsocket.stream.io.impl; 23 24 import java.io.IOException ; 25 import java.nio.ByteBuffer ; 26 import java.util.LinkedList ; 27 import java.util.TimerTask ; 28 import java.util.logging.Level ; 29 import java.util.logging.Logger ; 30 31 import org.xsocket.DataConverter; 32 import org.xsocket.stream.INonBlockingConnection; 33 import org.xsocket.stream.io.spi.IIoHandlerCallback; 34 35 36 37 38 43 final class IoThrottledWriteHandler extends ChainableIoHandler { 44 45 private static final Logger LOG = Logger.getLogger(IoThrottledWriteHandler.class.getName()); 46 47 48 private final LinkedList <DelayQueueEntry> sendQueue = new LinkedList <DelayQueueEntry>(); 50 51 52 53 private int sendBytesPerSec = INonBlockingConnection.UNLIMITED; 55 private TimerTask delayedDelivererTask = null; 56 57 58 59 public void init(IIoHandlerCallback callbackHandler) throws IOException { 60 setPreviousCallback(callbackHandler); 61 getSuccessor().init(callbackHandler); 62 } 63 64 65 66 70 IoThrottledWriteHandler(ChainableIoHandler successor) { 71 super(successor); 72 } 73 74 75 80 void setWriteRateSec(int writeRateSec) { 81 this.sendBytesPerSec = writeRateSec; 82 } 83 84 85 86 87 90 @Override 91 public int getPendingWriteDataSize() { 92 return getSendQueueSize() + super.getPendingWriteDataSize(); 93 } 94 95 96 @SuppressWarnings ("unchecked") 97 private int getSendQueueSize() { 98 int size = 0; 99 100 LinkedList <DelayQueueEntry> copy = null; 101 synchronized (sendQueue) { 102 copy = (LinkedList <DelayQueueEntry>) sendQueue.clone(); 103 } 104 105 for (DelayQueueEntry entry : copy) { 106 size += entry.buffer.remaining(); 107 } 108 109 return size; 110 } 111 112 113 116 public LinkedList <ByteBuffer > drainIncoming() { 117 return getSuccessor().drainIncoming(); 118 } 119 120 121 124 public void close(boolean immediate) throws IOException { 125 if (!immediate) { 126 flushOutgoing(); 127 } 128 129 getSuccessor().close(immediate); 130 } 131 132 133 136 public void writeOutgoing(ByteBuffer buffer) { 137 138 int size = buffer.remaining(); 140 if (size > 0) { 141 142 DelayQueueEntry delayQueueEntry = new DelayQueueEntry(buffer.duplicate(), sendBytesPerSec); 143 144 if (LOG.isLoggable(Level.FINE)) { 145 LOG.fine("[" + getId() + "] add " + delayQueueEntry + " to delay queue"); 146 } 147 synchronized (sendQueue) { 148 sendQueue.offer(delayQueueEntry); 149 } 150 } 151 152 if (delayedDelivererTask == null) { 154 int period = 500; 155 156 if (LOG.isLoggable(Level.FINE)) { 157 LOG.fine("[" + getId() + "] delay delivery task is null. Starting task (period=" + DataConverter.toFormatedDuration(period) + ")"); 158 } 159 delayedDelivererTask = new DeliveryTask(); 160 IoProvider.getTimer().schedule(delayedDelivererTask, 0, 500); 161 } 162 163 } 164 165 166 169 public void writeOutgoing(LinkedList <ByteBuffer > buffers) { 170 for (ByteBuffer buffer : buffers) { 171 writeOutgoing(buffer); 172 } 173 } 174 175 176 177 180 public void flushOutgoing() throws IOException { 181 if (LOG.isLoggable(Level.FINE)) { 182 LOG.fine("flush remaning data"); 183 } 184 185 186 synchronized (sendQueue) { 187 if (!sendQueue.isEmpty()) { 188 DelayQueueEntry[] entries = sendQueue.toArray(new DelayQueueEntry[sendQueue.size()]); 189 sendQueue.clear(); 190 191 ByteBuffer [] buffers = new ByteBuffer [entries.length]; 192 for (int i = 0; i < buffers.length; i++) { 193 buffers[i] = entries[i].getBuffer(); 194 } 195 196 if (LOG.isLoggable(Level.FINE)) { 197 LOG.fine("[" + getId() + "] flushing " + buffers.length + " buffers of delay queue"); 198 } 199 200 for (ByteBuffer buffer : buffers) { 201 try { 202 IoThrottledWriteHandler.this.getSuccessor().writeOutgoing(buffer); 203 } catch (Exception e) { 204 if (LOG.isLoggable(Level.FINE)) { 205 LOG.fine("[" + getId() + "] error occured while writing. Reason: " + e.toString()); 206 } 207 } 208 } 209 } 210 } 211 212 getSuccessor().flushOutgoing(); 213 } 214 215 216 private final class DeliveryTask extends TimerTask { 217 218 @Override 219 public void run() { 220 synchronized(sendQueue) { 221 222 long currentTime = System.currentTimeMillis(); 223 while(!sendQueue.isEmpty()) { 224 try { 225 226 DelayQueueEntry qe = sendQueue.peek(); 228 int remaingSize = qe.write(currentTime); 229 230 if (remaingSize == 0) { 232 sendQueue.remove(qe); 233 234 if (LOG.isLoggable(Level.FINE)) { 235 LOG.fine("throttling write queue is emtpy"); 236 } 237 238 239 } else { 241 break; 242 } 243 244 } catch (Throwable e) { 245 if (LOG.isLoggable(Level.FINE)) { 246 LOG.fine("[" + getId() + "] Error occured while write delayed. Reason: " + e.toString()); 247 } 248 } 249 } 250 } 251 } 252 } 253 254 255 private final class DelayQueueEntry { 256 private ByteBuffer buffer = null; 257 private int bytesPerSec = 0; 258 private long lastWriteTime = 0; 259 260 261 DelayQueueEntry(ByteBuffer buffer, int bytesPerSec) { 262 this.buffer = buffer; 263 this.bytesPerSec = bytesPerSec; 264 this.lastWriteTime = System.currentTimeMillis(); 265 } 266 267 268 ByteBuffer getBuffer() { 269 return buffer; 270 } 271 272 273 int write(long currentTime) throws IOException { 274 int remaingSize = buffer.remaining(); 275 276 long elapsedTimeMillis = currentTime - lastWriteTime; 277 278 if (elapsedTimeMillis > 0) { 279 int elapsedTimeSec = ((int) (elapsedTimeMillis)) / 1000; 280 281 if (elapsedTimeSec > 0) { 282 int sizeToWrite = bytesPerSec * elapsedTimeSec; 283 284 if (sizeToWrite > 0) { 285 ByteBuffer bytesToWrite = null; 286 if (buffer.remaining() <= sizeToWrite) { 287 bytesToWrite = buffer; 288 remaingSize = 0; 289 290 } else { 291 int saveLimit = buffer.limit(); 292 buffer.limit(sizeToWrite); 293 bytesToWrite = buffer.slice(); 294 buffer.position(buffer.limit()); 295 buffer.limit(saveLimit); 296 buffer = buffer.slice(); 297 remaingSize = buffer.remaining(); 298 } 299 300 lastWriteTime = currentTime; 301 if (LOG.isLoggable(Level.FINE)) { 302 LOG.fine("[" + getId() + "] release " + sizeToWrite + " bytes from delay queue"); 303 } 304 getSuccessor().writeOutgoing(bytesToWrite); 305 } 306 } 307 } 308 309 return remaingSize; 310 } 311 312 313 314 @Override 315 public String toString() { 316 return "buffer " + DataConverter.toFormatedBytesSize(buffer.remaining()) + " (write rate " + bytesPerSec + " bytes/sec)"; 317 } 318 } 319 320 321 322 325 @Override 326 public String toString() { 327 try { 328 return this.getClass().getSimpleName() + "(pending delayQueueSize=" + DataConverter.toFormatedBytesSize(getPendingWriteDataSize()) + ") ->" + "\r\n" + getSuccessor().toString(); 329 } catch (Exception e) { 330 return super.toString(); 331 } 332 } 333 334 } 335 | Popular Tags |