KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > stream > io > impl > IoThrottledWriteHandler


1 // $Id: IoDelayWriteHandler.java 1316 2007-06-10 08:51:18Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006 - 2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket.stream.io.impl;
23
24 import java.io.IOException JavaDoc;
25 import java.nio.ByteBuffer JavaDoc;
26 import java.util.LinkedList JavaDoc;
27 import java.util.TimerTask JavaDoc;
28 import java.util.logging.Level JavaDoc;
29 import java.util.logging.Logger JavaDoc;
30
31 import org.xsocket.DataConverter;
32 import org.xsocket.stream.INonBlockingConnection;
33 import org.xsocket.stream.io.spi.IIoHandlerCallback;
34
35
36
37
38 /**
39  * Delayed write IO handler
40  *
41  * @author grro@xsocket.org
42  */

43 final class IoThrottledWriteHandler extends ChainableIoHandler {
44
45     private static final Logger JavaDoc LOG = Logger.getLogger(IoThrottledWriteHandler.class.getName());
46     
47     
48     // write queue
49
private final LinkedList JavaDoc<DelayQueueEntry> sendQueue = new LinkedList JavaDoc<DelayQueueEntry>();
50     
51     
52     
53     // timer handling
54
private int sendBytesPerSec = INonBlockingConnection.UNLIMITED;
55     private TimerTask JavaDoc delayedDelivererTask = null;
56
57     
58
59     public void init(IIoHandlerCallback callbackHandler) throws IOException JavaDoc {
60         setPreviousCallback(callbackHandler);
61         getSuccessor().init(callbackHandler);
62     }
63     
64     
65
66     /**
67      * constructor
68      * @param successor the successor
69      */

70     IoThrottledWriteHandler(ChainableIoHandler successor) {
71         super(successor);
72     }
73
74     
75     /**
76      * set the write rate in sec
77      *
78      * @param writeRateSec the write rate
79      */

80     void setWriteRateSec(int writeRateSec) {
81         this.sendBytesPerSec = writeRateSec;
82     }
83     
84     
85
86
87     /**
88      * {@inheritDoc}
89      */

90     @Override JavaDoc
91     public int getPendingWriteDataSize() {
92         return getSendQueueSize() + super.getPendingWriteDataSize();
93     }
94
95     
96     @SuppressWarnings JavaDoc("unchecked")
97     private int getSendQueueSize() {
98         int size = 0;
99     
100         LinkedList JavaDoc<DelayQueueEntry> copy = null;
101         synchronized (sendQueue) {
102             copy = (LinkedList JavaDoc<DelayQueueEntry>) sendQueue.clone();
103         }
104         
105         for (DelayQueueEntry entry : copy) {
106             size += entry.buffer.remaining();
107         }
108         
109         return size;
110     }
111     
112
113     /**
114      * {@inheritDoc}
115      */

116     public LinkedList JavaDoc<ByteBuffer JavaDoc> drainIncoming() {
117         return getSuccessor().drainIncoming();
118     }
119     
120     
121     /**
122      * {@inheritDoc}
123      */

124     public void close(boolean immediate) throws IOException JavaDoc {
125         if (!immediate) {
126             flushOutgoing();
127         }
128         
129         getSuccessor().close(immediate);
130     }
131     
132
133     /**
134      * {@inheritDoc}
135      */

136     public void writeOutgoing(ByteBuffer JavaDoc buffer) {
137
138         // append to delay queue
139
int size = buffer.remaining();
140         if (size > 0) {
141             
142             DelayQueueEntry delayQueueEntry = new DelayQueueEntry(buffer.duplicate(), sendBytesPerSec);
143             
144             if (LOG.isLoggable(Level.FINE)) {
145                 LOG.fine("[" + getId() + "] add " + delayQueueEntry + " to delay queue");
146             }
147             synchronized (sendQueue) {
148                 sendQueue.offer(delayQueueEntry);
149             }
150         }
151
152         // create delivery task if not exists
153
if (delayedDelivererTask == null) {
154             int period = 500;
155             
156             if (LOG.isLoggable(Level.FINE)) {
157                 LOG.fine("[" + getId() + "] delay delivery task is null. Starting task (period=" + DataConverter.toFormatedDuration(period) + ")");
158             }
159             delayedDelivererTask = new DeliveryTask();
160             IoProvider.getTimer().schedule(delayedDelivererTask, 0, 500);
161         }
162
163     }
164     
165     
166     /**
167      * {@inheritDoc}
168      */

169     public void writeOutgoing(LinkedList JavaDoc<ByteBuffer JavaDoc> buffers) {
170         for (ByteBuffer JavaDoc buffer : buffers) {
171             writeOutgoing(buffer);
172         }
173     }
174     
175
176     
177     /**
178      * {@inheritDoc}
179      */

180     public void flushOutgoing() throws IOException JavaDoc {
181         if (LOG.isLoggable(Level.FINE)) {
182             LOG.fine("flush remaning data");
183         }
184
185         
186         synchronized (sendQueue) {
187             if (!sendQueue.isEmpty()) {
188                 DelayQueueEntry[] entries = sendQueue.toArray(new DelayQueueEntry[sendQueue.size()]);
189                 sendQueue.clear();
190                                 
191                 ByteBuffer JavaDoc[] buffers = new ByteBuffer JavaDoc[entries.length];
192                 for (int i = 0; i < buffers.length; i++) {
193                     buffers[i] = entries[i].getBuffer();
194                 }
195                 
196                 if (LOG.isLoggable(Level.FINE)) {
197                     LOG.fine("[" + getId() + "] flushing " + buffers.length + " buffers of delay queue");
198                 }
199                 
200                 for (ByteBuffer JavaDoc buffer : buffers) {
201                     try {
202                         IoThrottledWriteHandler.this.getSuccessor().writeOutgoing(buffer);
203                     } catch (Exception JavaDoc e) {
204                         if (LOG.isLoggable(Level.FINE)) {
205                             LOG.fine("[" + getId() + "] error occured while writing. Reason: " + e.toString());
206                         }
207                     }
208                 }
209             }
210         }
211         
212         getSuccessor().flushOutgoing();
213     }
214
215     
216     private final class DeliveryTask extends TimerTask JavaDoc {
217
218         @Override JavaDoc
219         public void run() {
220             synchronized(sendQueue) {
221
222                 long currentTime = System.currentTimeMillis();
223                 while(!sendQueue.isEmpty()) {
224                     try {
225                         
226                         // get the oldest entry and write based on rate
227
DelayQueueEntry qe = sendQueue.peek();
228                         int remaingSize = qe.write(currentTime);
229                         
230                         // if all data of this entry is written remove entry and stay in loop
231
if (remaingSize == 0) {
232                             sendQueue.remove(qe);
233                             
234                             if (LOG.isLoggable(Level.FINE)) {
235                                 LOG.fine("throttling write queue is emtpy");
236                             }
237                             
238                             
239                         // ... else break loop and wait for next time event
240
} else {
241                             break;
242                         }
243                         
244                     } catch (Throwable JavaDoc e) {
245                         if (LOG.isLoggable(Level.FINE)) {
246                             LOG.fine("[" + getId() + "] Error occured while write delayed. Reason: " + e.toString());
247                         }
248                     }
249                 }
250             }
251         }
252     }
253     
254     
255     private final class DelayQueueEntry {
256         private ByteBuffer JavaDoc buffer = null;
257         private int bytesPerSec = 0;
258         private long lastWriteTime = 0;
259         
260         
261         DelayQueueEntry(ByteBuffer JavaDoc buffer, int bytesPerSec) {
262             this.buffer = buffer;
263             this.bytesPerSec = bytesPerSec;
264             this.lastWriteTime = System.currentTimeMillis();
265         }
266
267         
268         ByteBuffer JavaDoc getBuffer() {
269             return buffer;
270         }
271         
272         
273         int write(long currentTime) throws IOException JavaDoc {
274             int remaingSize = buffer.remaining();
275             
276             long elapsedTimeMillis = currentTime - lastWriteTime;
277             
278             if (elapsedTimeMillis > 0) {
279                 int elapsedTimeSec = ((int) (elapsedTimeMillis)) / 1000;
280                 
281                 if (elapsedTimeSec > 0) {
282                     int sizeToWrite = bytesPerSec * elapsedTimeSec;
283                     
284                     if (sizeToWrite > 0) {
285                         ByteBuffer JavaDoc bytesToWrite = null;
286                         if (buffer.remaining() <= sizeToWrite) {
287                             bytesToWrite = buffer;
288                             remaingSize = 0;
289                             
290                         } else {
291                             int saveLimit = buffer.limit();
292                             buffer.limit(sizeToWrite);
293                             bytesToWrite = buffer.slice();
294                             buffer.position(buffer.limit());
295                             buffer.limit(saveLimit);
296                             buffer = buffer.slice();
297                             remaingSize = buffer.remaining();
298                         }
299                         
300                         lastWriteTime = currentTime;
301                         if (LOG.isLoggable(Level.FINE)) {
302                             LOG.fine("[" + getId() + "] release " + sizeToWrite + " bytes from delay queue");
303                         }
304                         getSuccessor().writeOutgoing(bytesToWrite);
305                     }
306                 }
307             }
308             
309             return remaingSize;
310         }
311         
312         
313         
314         @Override JavaDoc
315         public String JavaDoc toString() {
316             return "buffer " + DataConverter.toFormatedBytesSize(buffer.remaining()) + " (write rate " + bytesPerSec + " bytes/sec)";
317         }
318     }
319     
320     
321     
322     /**
323      * {@inheritDoc}
324      */

325     @Override JavaDoc
326     public String JavaDoc toString() {
327         try {
328             return this.getClass().getSimpleName() + "(pending delayQueueSize=" + DataConverter.toFormatedBytesSize(getPendingWriteDataSize()) + ") ->" + "\r\n" + getSuccessor().toString();
329         } catch (Exception JavaDoc e) {
330             return super.toString();
331         }
332     }
333
334 }
335
Popular Tags