1 20 package org.apache.mina.filter; 21 22 import java.io.ByteArrayInputStream ; 23 import java.io.IOException ; 24 import java.io.InputStream ; 25 import java.net.InetSocketAddress ; 26 import java.net.SocketAddress ; 27 import java.security.MessageDigest ; 28 import java.util.LinkedList ; 29 import java.util.Queue ; 30 import java.util.Random ; 31 32 import junit.framework.TestCase; 33 import org.apache.mina.common.ByteBuffer; 34 import org.apache.mina.common.IdleStatus; 35 import org.apache.mina.common.IoAcceptor; 36 import org.apache.mina.common.IoConnector; 37 import org.apache.mina.common.IoFilter.NextFilter; 38 import org.apache.mina.common.IoFilter.WriteRequest; 39 import org.apache.mina.common.IoFutureListener; 40 import org.apache.mina.common.IoHandlerAdapter; 41 import org.apache.mina.common.IoSession; 42 import org.apache.mina.common.WriteFuture; 43 import org.apache.mina.common.support.DefaultWriteFuture; 44 import org.apache.mina.transport.socket.nio.SocketAcceptor; 45 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; 46 import org.apache.mina.transport.socket.nio.SocketConnector; 47 import org.apache.mina.util.AvailablePortFinder; 48 import org.easymock.AbstractMatcher; 49 import org.easymock.MockControl; 50 51 57 public class StreamWriteFilterTest extends TestCase { 58 MockControl mockSession; 59 60 MockControl mockNextFilter; 61 62 IoSession session; 63 64 NextFilter nextFilter; 65 66 @Override 67 protected void setUp() throws Exception { 68 super.setUp(); 69 70 73 mockSession = MockControl.createControl(IoSession.class); 74 mockNextFilter = MockControl.createControl(NextFilter.class); 75 session = (IoSession) mockSession.getMock(); 76 nextFilter = (NextFilter) mockNextFilter.getMock(); 77 78 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 79 mockSession.setReturnValue(null); 80 } 81 82 public void testWriteEmptyStream() throws Exception { 83 StreamWriteFilter filter = new StreamWriteFilter(); 84 85 InputStream stream = new ByteArrayInputStream (new byte[0]); 86 WriteRequest writeRequest = new WriteRequest(stream, 87 new DummyWriteFuture()); 88 89 92 nextFilter.messageSent(session, stream); 93 94 97 mockNextFilter.replay(); 98 mockSession.replay(); 99 100 filter.filterWrite(nextFilter, session, writeRequest); 101 102 105 mockNextFilter.verify(); 106 mockSession.verify(); 107 108 assertTrue(writeRequest.getFuture().isWritten()); 109 } 110 111 115 public void testWriteNonStreamMessage() throws Exception { 116 StreamWriteFilter filter = new StreamWriteFilter(); 117 118 Object message = new Object (); 119 WriteRequest writeRequest = new WriteRequest(message, 120 new DummyWriteFuture()); 121 122 125 nextFilter.filterWrite(session, writeRequest); 126 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 127 mockSession.setReturnValue(null); 128 nextFilter.messageSent(session, message); 129 130 133 mockNextFilter.replay(); 134 mockSession.replay(); 135 136 filter.filterWrite(nextFilter, session, writeRequest); 137 filter.messageSent(nextFilter, session, message); 138 139 142 mockNextFilter.verify(); 143 mockSession.verify(); 144 } 145 146 149 public void testWriteSingleBufferStream() throws Exception { 150 StreamWriteFilter filter = new StreamWriteFilter(); 151 152 byte[] data = new byte[] { 1, 2, 3, 4 }; 153 154 InputStream stream = new ByteArrayInputStream (data); 155 WriteRequest writeRequest = new WriteRequest(stream, 156 new DummyWriteFuture()); 157 158 161 session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream); 162 mockSession.setReturnValue(null); 163 session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE, 164 writeRequest.getFuture()); 165 mockSession.setReturnValue(null); 166 nextFilter 167 .filterWrite(session, new WriteRequest(ByteBuffer.wrap(data))); 168 mockNextFilter.setMatcher(new WriteRequestMatcher()); 169 170 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 171 mockSession.setReturnValue(stream); 172 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM); 173 mockSession.setReturnValue(stream); 174 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE); 175 mockSession.setReturnValue(writeRequest.getFuture()); 176 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE); 177 mockSession.setReturnValue(null); 178 nextFilter.messageSent(session, stream); 179 180 183 mockNextFilter.replay(); 184 mockSession.replay(); 185 186 filter.filterWrite(nextFilter, session, writeRequest); 187 filter.messageSent(nextFilter, session, data); 188 189 192 mockNextFilter.verify(); 193 mockSession.verify(); 194 195 assertTrue(writeRequest.getFuture().isWritten()); 196 } 197 198 201 public void testWriteSeveralBuffersStream() throws Exception { 202 StreamWriteFilter filter = new StreamWriteFilter(); 203 filter.setWriteBufferSize(4); 204 205 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 206 byte[] chunk1 = new byte[] { 1, 2, 3, 4 }; 207 byte[] chunk2 = new byte[] { 5, 6, 7, 8 }; 208 byte[] chunk3 = new byte[] { 9, 10 }; 209 210 InputStream stream = new ByteArrayInputStream (data); 211 WriteRequest writeRequest = new WriteRequest(stream, 212 new DummyWriteFuture()); 213 214 217 session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream); 218 mockSession.setReturnValue(null); 219 session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE, 220 writeRequest.getFuture()); 221 mockSession.setReturnValue(null); 222 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer 223 .wrap(chunk1))); 224 mockNextFilter.setMatcher(new WriteRequestMatcher()); 225 226 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 227 mockSession.setReturnValue(stream); 228 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer 229 .wrap(chunk2))); 230 231 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 232 mockSession.setReturnValue(stream); 233 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer 234 .wrap(chunk3))); 235 236 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 237 mockSession.setReturnValue(stream); 238 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM); 239 mockSession.setReturnValue(stream); 240 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE); 241 mockSession.setReturnValue(writeRequest.getFuture()); 242 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE); 243 mockSession.setReturnValue(null); 244 nextFilter.messageSent(session, stream); 245 246 249 mockNextFilter.replay(); 250 mockSession.replay(); 251 252 filter.filterWrite(nextFilter, session, writeRequest); 253 filter.messageSent(nextFilter, session, chunk1); 254 filter.messageSent(nextFilter, session, chunk2); 255 filter.messageSent(nextFilter, session, chunk3); 256 257 260 mockNextFilter.verify(); 261 mockSession.verify(); 262 263 assertTrue(writeRequest.getFuture().isWritten()); 264 } 265 266 public void testWriteWhileWriteInProgress() throws Exception { 267 StreamWriteFilter filter = new StreamWriteFilter(); 268 269 Queue <? extends Object > queue = new LinkedList <Object >(); 270 InputStream stream = new ByteArrayInputStream (new byte[5]); 271 272 275 mockSession.reset(); 276 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 277 mockSession.setReturnValue(stream); 278 session.getAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE); 279 mockSession.setReturnValue(queue); 280 281 284 mockNextFilter.replay(); 285 mockSession.replay(); 286 287 WriteRequest wr = new WriteRequest(new Object (), new DummyWriteFuture()); 288 filter.filterWrite(nextFilter, session, wr); 289 assertEquals(1, queue.size()); 290 assertSame(wr, queue.poll()); 291 292 295 mockNextFilter.verify(); 296 mockSession.verify(); 297 } 298 299 public void testWritesWriteRequestQueueWhenFinished() throws Exception { 300 StreamWriteFilter filter = new StreamWriteFilter(); 301 302 WriteRequest wrs[] = new WriteRequest[] { 303 new WriteRequest(new Object (), new DummyWriteFuture()), 304 new WriteRequest(new Object (), new DummyWriteFuture()), 305 new WriteRequest(new Object (), new DummyWriteFuture()) }; 306 Queue <WriteRequest> queue = new LinkedList <WriteRequest>(); 307 queue.add(wrs[0]); 308 queue.add(wrs[1]); 309 queue.add(wrs[2]); 310 InputStream stream = new ByteArrayInputStream (new byte[0]); 311 312 315 mockSession.reset(); 316 317 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 318 mockSession.setReturnValue(stream); 319 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM); 320 mockSession.setReturnValue(stream); 321 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE); 322 mockSession.setReturnValue(new DefaultWriteFuture(session)); 323 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE); 324 mockSession.setReturnValue(queue); 325 326 nextFilter.filterWrite(session, wrs[0]); 327 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 328 mockSession.setReturnValue(null); 329 nextFilter.filterWrite(session, wrs[1]); 330 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 331 mockSession.setReturnValue(null); 332 nextFilter.filterWrite(session, wrs[2]); 333 session.getAttribute(StreamWriteFilter.CURRENT_STREAM); 334 mockSession.setReturnValue(null); 335 336 nextFilter.messageSent(session, stream); 337 338 341 mockNextFilter.replay(); 342 mockSession.replay(); 343 344 filter.messageSent(nextFilter, session, new Object ()); 345 assertEquals(0, queue.size()); 346 347 350 mockNextFilter.verify(); 351 mockSession.verify(); 352 } 353 354 358 public void testSetWriteBufferSize() throws Exception { 359 StreamWriteFilter filter = new StreamWriteFilter(); 360 361 try { 362 filter.setWriteBufferSize(0); 363 fail("0 writeBuferSize specified. IllegalArgumentException expected."); 364 } catch (IllegalArgumentException iae) { 365 } 366 367 try { 368 filter.setWriteBufferSize(-100); 369 fail("Negative writeBuferSize specified. IllegalArgumentException expected."); 370 } catch (IllegalArgumentException iae) { 371 } 372 373 filter.setWriteBufferSize(1); 374 assertEquals(1, filter.getWriteBufferSize()); 375 filter.setWriteBufferSize(1024); 376 assertEquals(1024, filter.getWriteBufferSize()); 377 } 378 379 public void testWriteUsingSocketTransport() throws Exception { 380 IoAcceptor acceptor = new SocketAcceptor(); 381 ((SocketAcceptorConfig) acceptor.getDefaultConfig()) 382 .setReuseAddress(true); 383 SocketAddress address = new InetSocketAddress ("localhost", 384 AvailablePortFinder.getNextAvailable()); 385 386 IoConnector connector = new SocketConnector(); 387 388 FixedRandomInputStream stream = new FixedRandomInputStream( 389 4 * 1024 * 1024); 390 391 SenderHandler sender = new SenderHandler(stream); 392 ReceiverHandler receiver = new ReceiverHandler(stream.size); 393 394 acceptor.bind(address, sender); 395 396 synchronized (sender.lock) { 397 synchronized (receiver.lock) { 398 connector.connect(address, receiver); 399 400 sender.lock.wait(); 401 receiver.lock.wait(); 402 } 403 } 404 405 acceptor.unbind(address); 406 407 assertEquals(stream.bytesRead, receiver.bytesRead); 408 assertEquals(stream.size, receiver.bytesRead); 409 byte[] expectedMd5 = stream.digest.digest(); 410 byte[] actualMd5 = receiver.digest.digest(); 411 assertEquals(expectedMd5.length, actualMd5.length); 412 for (int i = 0; i < expectedMd5.length; i++) { 413 assertEquals(expectedMd5[i], actualMd5[i]); 414 } 415 } 416 417 private static class FixedRandomInputStream extends InputStream { 418 long size; 419 420 long bytesRead = 0; 421 422 Random random = new Random (); 423 424 MessageDigest digest; 425 426 FixedRandomInputStream(long size) throws Exception { 427 this.size = size; 428 digest = MessageDigest.getInstance("MD5"); 429 } 430 431 @Override 432 public int read() throws IOException { 433 if (isAllWritten()) 434 return -1; 435 bytesRead++; 436 byte b = (byte) random.nextInt(255); 437 digest.update(b); 438 return b; 439 } 440 441 public long getBytesRead() { 442 return bytesRead; 443 } 444 445 public long getSize() { 446 return size; 447 } 448 449 public boolean isAllWritten() { 450 return bytesRead >= size; 451 } 452 } 453 454 private static class SenderHandler extends IoHandlerAdapter { 455 final Object lock = new Object (); 456 457 InputStream inputStream; 458 459 StreamWriteFilter streamWriteFilter = new StreamWriteFilter(); 460 461 SenderHandler(InputStream inputStream) { 462 this.inputStream = inputStream; 463 } 464 465 @Override 466 public void sessionCreated(IoSession session) throws Exception { 467 super.sessionCreated(session); 468 session.getFilterChain().addLast("codec", streamWriteFilter); 469 } 470 471 @Override 472 public void sessionOpened(IoSession session) throws Exception { 473 session.write(inputStream); 474 } 475 476 @Override 477 public void exceptionCaught(IoSession session, Throwable cause) 478 throws Exception { 479 synchronized (lock) { 480 lock.notifyAll(); 481 } 482 } 483 484 @Override 485 public void sessionClosed(IoSession session) throws Exception { 486 synchronized (lock) { 487 lock.notifyAll(); 488 } 489 } 490 491 @Override 492 public void sessionIdle(IoSession session, IdleStatus status) 493 throws Exception { 494 synchronized (lock) { 495 lock.notifyAll(); 496 } 497 } 498 499 @Override 500 public void messageSent(IoSession session, Object message) 501 throws Exception { 502 if (message == inputStream) { 503 synchronized (lock) { 504 lock.notifyAll(); 505 } 506 } 507 } 508 } 509 510 private static class ReceiverHandler extends IoHandlerAdapter { 511 final Object lock = new Object (); 512 513 long bytesRead = 0; 514 515 long size = 0; 516 517 MessageDigest digest; 518 519 ReceiverHandler(long size) throws Exception { 520 this.size = size; 521 digest = MessageDigest.getInstance("MD5"); 522 } 523 524 @Override 525 public void sessionCreated(IoSession session) throws Exception { 526 super.sessionCreated(session); 527 528 session.setIdleTime(IdleStatus.READER_IDLE, 5); 529 } 530 531 @Override 532 public void sessionIdle(IoSession session, IdleStatus status) 533 throws Exception { 534 session.close(); 535 } 536 537 @Override 538 public void exceptionCaught(IoSession session, Throwable cause) 539 throws Exception { 540 synchronized (lock) { 541 lock.notifyAll(); 542 } 543 } 544 545 @Override 546 public void sessionClosed(IoSession session) throws Exception { 547 synchronized (lock) { 548 lock.notifyAll(); 549 } 550 } 551 552 @Override 553 public void messageReceived(IoSession session, Object message) 554 throws Exception { 555 ByteBuffer buf = (ByteBuffer) message; 556 while (buf.hasRemaining()) { 557 digest.update(buf.get()); 558 bytesRead++; 559 } 560 if (bytesRead >= size) { 561 session.close(); 562 } 563 } 564 } 565 566 public static class WriteRequestMatcher extends AbstractMatcher { 567 @Override 568 protected boolean argumentMatches(Object expected, Object actual) { 569 if (expected instanceof WriteRequest 570 && actual instanceof WriteRequest) { 571 WriteRequest w1 = (WriteRequest) expected; 572 WriteRequest w2 = (WriteRequest) actual; 573 574 return w1.getMessage().equals(w2.getMessage()) 575 && w1.getFuture().isWritten() == w2.getFuture() 576 .isWritten(); 577 } 578 return super.argumentMatches(expected, actual); 579 } 580 } 581 582 private static class DummyWriteFuture implements WriteFuture { 583 private boolean written; 584 585 public boolean isWritten() { 586 return written; 587 } 588 589 public void setWritten(boolean written) { 590 this.written = written; 591 } 592 593 public IoSession getSession() { 594 return null; 595 } 596 597 public Object getLock() { 598 return this; 599 } 600 601 public void join() { 602 } 603 604 public boolean join(long timeoutInMillis) { 605 return true; 606 } 607 608 public boolean isReady() { 609 return true; 610 } 611 612 public void addListener(IoFutureListener listener) { 613 } 614 615 public void removeListener(IoFutureListener listener) { 616 } 617 } 618 } 619 | Popular Tags |