KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > filter > StreamWriteFilterTest


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.filter;
21
22 import java.io.ByteArrayInputStream JavaDoc;
23 import java.io.IOException JavaDoc;
24 import java.io.InputStream JavaDoc;
25 import java.net.InetSocketAddress JavaDoc;
26 import java.net.SocketAddress JavaDoc;
27 import java.security.MessageDigest JavaDoc;
28 import java.util.LinkedList JavaDoc;
29 import java.util.Queue JavaDoc;
30 import java.util.Random JavaDoc;
31
32 import junit.framework.TestCase;
33 import org.apache.mina.common.ByteBuffer;
34 import org.apache.mina.common.IdleStatus;
35 import org.apache.mina.common.IoAcceptor;
36 import org.apache.mina.common.IoConnector;
37 import org.apache.mina.common.IoFilter.NextFilter;
38 import org.apache.mina.common.IoFilter.WriteRequest;
39 import org.apache.mina.common.IoFutureListener;
40 import org.apache.mina.common.IoHandlerAdapter;
41 import org.apache.mina.common.IoSession;
42 import org.apache.mina.common.WriteFuture;
43 import org.apache.mina.common.support.DefaultWriteFuture;
44 import org.apache.mina.transport.socket.nio.SocketAcceptor;
45 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
46 import org.apache.mina.transport.socket.nio.SocketConnector;
47 import org.apache.mina.util.AvailablePortFinder;
48 import org.easymock.AbstractMatcher;
49 import org.easymock.MockControl;
50
51 /**
52  * Tests {@link StreamWriteFilter}.
53  *
54  * @author The Apache Directory Project (mina-dev@directory.apache.org)
55  * @version $Rev$, $Date$
56  */

57 public class StreamWriteFilterTest extends TestCase {
58     MockControl mockSession;
59
60     MockControl mockNextFilter;
61
62     IoSession session;
63
64     NextFilter nextFilter;
65
66     @Override JavaDoc
67     protected void setUp() throws Exception JavaDoc {
68         super.setUp();
69
70         /*
71          * Create the mocks.
72          */

73         mockSession = MockControl.createControl(IoSession.class);
74         mockNextFilter = MockControl.createControl(NextFilter.class);
75         session = (IoSession) mockSession.getMock();
76         nextFilter = (NextFilter) mockNextFilter.getMock();
77
78         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
79         mockSession.setReturnValue(null);
80     }
81
82     public void testWriteEmptyStream() throws Exception JavaDoc {
83         StreamWriteFilter filter = new StreamWriteFilter();
84
85         InputStream JavaDoc stream = new ByteArrayInputStream JavaDoc(new byte[0]);
86         WriteRequest writeRequest = new WriteRequest(stream,
87                 new DummyWriteFuture());
88
89         /*
90          * Record expectations
91          */

92         nextFilter.messageSent(session, stream);
93
94         /*
95          * Replay.
96          */

97         mockNextFilter.replay();
98         mockSession.replay();
99
100         filter.filterWrite(nextFilter, session, writeRequest);
101
102         /*
103          * Verify.
104          */

105         mockNextFilter.verify();
106         mockSession.verify();
107
108         assertTrue(writeRequest.getFuture().isWritten());
109     }
110
111     /**
112      * Tests that the filter just passes objects which aren't InputStreams
113      * through to the next filter.
114      */

115     public void testWriteNonStreamMessage() throws Exception JavaDoc {
116         StreamWriteFilter filter = new StreamWriteFilter();
117
118         Object JavaDoc message = new Object JavaDoc();
119         WriteRequest writeRequest = new WriteRequest(message,
120                 new DummyWriteFuture());
121
122         /*
123          * Record expectations
124          */

125         nextFilter.filterWrite(session, writeRequest);
126         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
127         mockSession.setReturnValue(null);
128         nextFilter.messageSent(session, message);
129
130         /*
131          * Replay.
132          */

133         mockNextFilter.replay();
134         mockSession.replay();
135
136         filter.filterWrite(nextFilter, session, writeRequest);
137         filter.messageSent(nextFilter, session, message);
138
139         /*
140          * Verify.
141          */

142         mockNextFilter.verify();
143         mockSession.verify();
144     }
145
146     /**
147      * Tests when the contents of the stream fits into one write buffer.
148      */

149     public void testWriteSingleBufferStream() throws Exception JavaDoc {
150         StreamWriteFilter filter = new StreamWriteFilter();
151
152         byte[] data = new byte[] { 1, 2, 3, 4 };
153
154         InputStream JavaDoc stream = new ByteArrayInputStream JavaDoc(data);
155         WriteRequest writeRequest = new WriteRequest(stream,
156                 new DummyWriteFuture());
157
158         /*
159          * Record expectations
160          */

161         session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
162         mockSession.setReturnValue(null);
163         session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
164                 writeRequest.getFuture());
165         mockSession.setReturnValue(null);
166         nextFilter
167                 .filterWrite(session, new WriteRequest(ByteBuffer.wrap(data)));
168         mockNextFilter.setMatcher(new WriteRequestMatcher());
169
170         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
171         mockSession.setReturnValue(stream);
172         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
173         mockSession.setReturnValue(stream);
174         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
175         mockSession.setReturnValue(writeRequest.getFuture());
176         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
177         mockSession.setReturnValue(null);
178         nextFilter.messageSent(session, stream);
179
180         /*
181          * Replay.
182          */

183         mockNextFilter.replay();
184         mockSession.replay();
185
186         filter.filterWrite(nextFilter, session, writeRequest);
187         filter.messageSent(nextFilter, session, data);
188
189         /*
190          * Verify.
191          */

192         mockNextFilter.verify();
193         mockSession.verify();
194
195         assertTrue(writeRequest.getFuture().isWritten());
196     }
197
198     /**
199      * Tests when the contents of the stream doesn't fit into one write buffer.
200      */

201     public void testWriteSeveralBuffersStream() throws Exception JavaDoc {
202         StreamWriteFilter filter = new StreamWriteFilter();
203         filter.setWriteBufferSize(4);
204
205         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
206         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
207         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
208         byte[] chunk3 = new byte[] { 9, 10 };
209
210         InputStream JavaDoc stream = new ByteArrayInputStream JavaDoc(data);
211         WriteRequest writeRequest = new WriteRequest(stream,
212                 new DummyWriteFuture());
213
214         /*
215          * Record expectations
216          */

217         session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
218         mockSession.setReturnValue(null);
219         session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
220                 writeRequest.getFuture());
221         mockSession.setReturnValue(null);
222         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
223                 .wrap(chunk1)));
224         mockNextFilter.setMatcher(new WriteRequestMatcher());
225
226         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
227         mockSession.setReturnValue(stream);
228         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
229                 .wrap(chunk2)));
230
231         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
232         mockSession.setReturnValue(stream);
233         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
234                 .wrap(chunk3)));
235
236         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
237         mockSession.setReturnValue(stream);
238         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
239         mockSession.setReturnValue(stream);
240         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
241         mockSession.setReturnValue(writeRequest.getFuture());
242         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
243         mockSession.setReturnValue(null);
244         nextFilter.messageSent(session, stream);
245
246         /*
247          * Replay.
248          */

249         mockNextFilter.replay();
250         mockSession.replay();
251
252         filter.filterWrite(nextFilter, session, writeRequest);
253         filter.messageSent(nextFilter, session, chunk1);
254         filter.messageSent(nextFilter, session, chunk2);
255         filter.messageSent(nextFilter, session, chunk3);
256
257         /*
258          * Verify.
259          */

260         mockNextFilter.verify();
261         mockSession.verify();
262
263         assertTrue(writeRequest.getFuture().isWritten());
264     }
265
266     public void testWriteWhileWriteInProgress() throws Exception JavaDoc {
267         StreamWriteFilter filter = new StreamWriteFilter();
268
269         Queue JavaDoc<? extends Object JavaDoc> queue = new LinkedList JavaDoc<Object JavaDoc>();
270         InputStream JavaDoc stream = new ByteArrayInputStream JavaDoc(new byte[5]);
271
272         /*
273          * Record expectations
274          */

275         mockSession.reset();
276         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
277         mockSession.setReturnValue(stream);
278         session.getAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
279         mockSession.setReturnValue(queue);
280
281         /*
282          * Replay.
283          */

284         mockNextFilter.replay();
285         mockSession.replay();
286
287         WriteRequest wr = new WriteRequest(new Object JavaDoc(), new DummyWriteFuture());
288         filter.filterWrite(nextFilter, session, wr);
289         assertEquals(1, queue.size());
290         assertSame(wr, queue.poll());
291
292         /*
293          * Verify.
294          */

295         mockNextFilter.verify();
296         mockSession.verify();
297     }
298
299     public void testWritesWriteRequestQueueWhenFinished() throws Exception JavaDoc {
300         StreamWriteFilter filter = new StreamWriteFilter();
301
302         WriteRequest wrs[] = new WriteRequest[] {
303                 new WriteRequest(new Object JavaDoc(), new DummyWriteFuture()),
304                 new WriteRequest(new Object JavaDoc(), new DummyWriteFuture()),
305                 new WriteRequest(new Object JavaDoc(), new DummyWriteFuture()) };
306         Queue JavaDoc<WriteRequest> queue = new LinkedList JavaDoc<WriteRequest>();
307         queue.add(wrs[0]);
308         queue.add(wrs[1]);
309         queue.add(wrs[2]);
310         InputStream JavaDoc stream = new ByteArrayInputStream JavaDoc(new byte[0]);
311
312         /*
313          * Record expectations
314          */

315         mockSession.reset();
316
317         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
318         mockSession.setReturnValue(stream);
319         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
320         mockSession.setReturnValue(stream);
321         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
322         mockSession.setReturnValue(new DefaultWriteFuture(session));
323         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
324         mockSession.setReturnValue(queue);
325
326         nextFilter.filterWrite(session, wrs[0]);
327         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
328         mockSession.setReturnValue(null);
329         nextFilter.filterWrite(session, wrs[1]);
330         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
331         mockSession.setReturnValue(null);
332         nextFilter.filterWrite(session, wrs[2]);
333         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
334         mockSession.setReturnValue(null);
335
336         nextFilter.messageSent(session, stream);
337
338         /*
339          * Replay.
340          */

341         mockNextFilter.replay();
342         mockSession.replay();
343
344         filter.messageSent(nextFilter, session, new Object JavaDoc());
345         assertEquals(0, queue.size());
346
347         /*
348          * Verify.
349          */

350         mockNextFilter.verify();
351         mockSession.verify();
352     }
353
354     /**
355      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
356      * specified size.
357      */

358     public void testSetWriteBufferSize() throws Exception JavaDoc {
359         StreamWriteFilter filter = new StreamWriteFilter();
360
361         try {
362             filter.setWriteBufferSize(0);
363             fail("0 writeBuferSize specified. IllegalArgumentException expected.");
364         } catch (IllegalArgumentException JavaDoc iae) {
365         }
366
367         try {
368             filter.setWriteBufferSize(-100);
369             fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
370         } catch (IllegalArgumentException JavaDoc iae) {
371         }
372
373         filter.setWriteBufferSize(1);
374         assertEquals(1, filter.getWriteBufferSize());
375         filter.setWriteBufferSize(1024);
376         assertEquals(1024, filter.getWriteBufferSize());
377     }
378
379     public void testWriteUsingSocketTransport() throws Exception JavaDoc {
380         IoAcceptor acceptor = new SocketAcceptor();
381         ((SocketAcceptorConfig) acceptor.getDefaultConfig())
382                 .setReuseAddress(true);
383         SocketAddress JavaDoc address = new InetSocketAddress JavaDoc("localhost",
384                 AvailablePortFinder.getNextAvailable());
385
386         IoConnector connector = new SocketConnector();
387
388         FixedRandomInputStream stream = new FixedRandomInputStream(
389                 4 * 1024 * 1024);
390
391         SenderHandler sender = new SenderHandler(stream);
392         ReceiverHandler receiver = new ReceiverHandler(stream.size);
393
394         acceptor.bind(address, sender);
395
396         synchronized (sender.lock) {
397             synchronized (receiver.lock) {
398                 connector.connect(address, receiver);
399
400                 sender.lock.wait();
401                 receiver.lock.wait();
402             }
403         }
404
405         acceptor.unbind(address);
406
407         assertEquals(stream.bytesRead, receiver.bytesRead);
408         assertEquals(stream.size, receiver.bytesRead);
409         byte[] expectedMd5 = stream.digest.digest();
410         byte[] actualMd5 = receiver.digest.digest();
411         assertEquals(expectedMd5.length, actualMd5.length);
412         for (int i = 0; i < expectedMd5.length; i++) {
413             assertEquals(expectedMd5[i], actualMd5[i]);
414         }
415     }
416
417     private static class FixedRandomInputStream extends InputStream JavaDoc {
418         long size;
419
420         long bytesRead = 0;
421
422         Random JavaDoc random = new Random JavaDoc();
423
424         MessageDigest JavaDoc digest;
425
426         FixedRandomInputStream(long size) throws Exception JavaDoc {
427             this.size = size;
428             digest = MessageDigest.getInstance("MD5");
429         }
430
431         @Override JavaDoc
432         public int read() throws IOException JavaDoc {
433             if (isAllWritten())
434                 return -1;
435             bytesRead++;
436             byte b = (byte) random.nextInt(255);
437             digest.update(b);
438             return b;
439         }
440
441         public long getBytesRead() {
442             return bytesRead;
443         }
444
445         public long getSize() {
446             return size;
447         }
448
449         public boolean isAllWritten() {
450             return bytesRead >= size;
451         }
452     }
453
454     private static class SenderHandler extends IoHandlerAdapter {
455         final Object JavaDoc lock = new Object JavaDoc();
456
457         InputStream JavaDoc inputStream;
458
459         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
460
461         SenderHandler(InputStream JavaDoc inputStream) {
462             this.inputStream = inputStream;
463         }
464
465         @Override JavaDoc
466         public void sessionCreated(IoSession session) throws Exception JavaDoc {
467             super.sessionCreated(session);
468             session.getFilterChain().addLast("codec", streamWriteFilter);
469         }
470
471         @Override JavaDoc
472         public void sessionOpened(IoSession session) throws Exception JavaDoc {
473             session.write(inputStream);
474         }
475
476         @Override JavaDoc
477         public void exceptionCaught(IoSession session, Throwable JavaDoc cause)
478                 throws Exception JavaDoc {
479             synchronized (lock) {
480                 lock.notifyAll();
481             }
482         }
483
484         @Override JavaDoc
485         public void sessionClosed(IoSession session) throws Exception JavaDoc {
486             synchronized (lock) {
487                 lock.notifyAll();
488             }
489         }
490
491         @Override JavaDoc
492         public void sessionIdle(IoSession session, IdleStatus status)
493                 throws Exception JavaDoc {
494             synchronized (lock) {
495                 lock.notifyAll();
496             }
497         }
498
499         @Override JavaDoc
500         public void messageSent(IoSession session, Object JavaDoc message)
501                 throws Exception JavaDoc {
502             if (message == inputStream) {
503                 synchronized (lock) {
504                     lock.notifyAll();
505                 }
506             }
507         }
508     }
509
510     private static class ReceiverHandler extends IoHandlerAdapter {
511         final Object JavaDoc lock = new Object JavaDoc();
512
513         long bytesRead = 0;
514
515         long size = 0;
516
517         MessageDigest JavaDoc digest;
518
519         ReceiverHandler(long size) throws Exception JavaDoc {
520             this.size = size;
521             digest = MessageDigest.getInstance("MD5");
522         }
523
524         @Override JavaDoc
525         public void sessionCreated(IoSession session) throws Exception JavaDoc {
526             super.sessionCreated(session);
527
528             session.setIdleTime(IdleStatus.READER_IDLE, 5);
529         }
530
531         @Override JavaDoc
532         public void sessionIdle(IoSession session, IdleStatus status)
533                 throws Exception JavaDoc {
534             session.close();
535         }
536
537         @Override JavaDoc
538         public void exceptionCaught(IoSession session, Throwable JavaDoc cause)
539                 throws Exception JavaDoc {
540             synchronized (lock) {
541                 lock.notifyAll();
542             }
543         }
544
545         @Override JavaDoc
546         public void sessionClosed(IoSession session) throws Exception JavaDoc {
547             synchronized (lock) {
548                 lock.notifyAll();
549             }
550         }
551
552         @Override JavaDoc
553         public void messageReceived(IoSession session, Object JavaDoc message)
554                 throws Exception JavaDoc {
555             ByteBuffer buf = (ByteBuffer) message;
556             while (buf.hasRemaining()) {
557                 digest.update(buf.get());
558                 bytesRead++;
559             }
560             if (bytesRead >= size) {
561                 session.close();
562             }
563         }
564     }
565
566     public static class WriteRequestMatcher extends AbstractMatcher {
567         @Override JavaDoc
568         protected boolean argumentMatches(Object JavaDoc expected, Object JavaDoc actual) {
569             if (expected instanceof WriteRequest
570                     && actual instanceof WriteRequest) {
571                 WriteRequest w1 = (WriteRequest) expected;
572                 WriteRequest w2 = (WriteRequest) actual;
573
574                 return w1.getMessage().equals(w2.getMessage())
575                         && w1.getFuture().isWritten() == w2.getFuture()
576                                 .isWritten();
577             }
578             return super.argumentMatches(expected, actual);
579         }
580     }
581
582     private static class DummyWriteFuture implements WriteFuture {
583         private boolean written;
584
585         public boolean isWritten() {
586             return written;
587         }
588
589         public void setWritten(boolean written) {
590             this.written = written;
591         }
592
593         public IoSession getSession() {
594             return null;
595         }
596
597         public Object JavaDoc getLock() {
598             return this;
599         }
600
601         public void join() {
602         }
603
604         public boolean join(long timeoutInMillis) {
605             return true;
606         }
607
608         public boolean isReady() {
609             return true;
610         }
611
612         public void addListener(IoFutureListener listener) {
613         }
614
615         public void removeListener(IoFutureListener listener) {
616         }
617     }
618 }
619
Popular Tags