1 22 package org.xsocket.stream; 23 24 import java.io.IOException ; 25 import java.io.UnsupportedEncodingException ; 26 import java.net.InetAddress ; 27 import java.net.InetSocketAddress ; 28 import java.net.SocketTimeoutException ; 29 import java.nio.BufferUnderflowException ; 30 import java.nio.ByteBuffer ; 31 import java.util.HashMap ; 32 import java.util.LinkedList ; 33 import java.util.Map ; 34 35 import javax.net.ssl.SSLContext; 36 37 import org.xsocket.ClosedConnectionException; 38 import org.xsocket.DataConverter; 39 import org.xsocket.MaxReadSizeExceededException; 40 41 42 43 50 public final class BlockingConnection extends Connection implements IBlockingConnection { 51 52 private Object readGuard = new Object (); 54 private long receiveTimeout = 0; 55 56 57 58 65 public BlockingConnection(String hostname, int port) throws IOException { 66 this(new InetSocketAddress (hostname, port), new HashMap <String , Object >(), null, false); 67 } 68 69 70 79 public BlockingConnection(String hostname, int port, StreamSocketConfiguration socketConfiguration) throws IOException { 80 this(new InetSocketAddress (hostname, port), socketConfiguration.toOptions(), null, false); 81 } 82 83 84 92 public BlockingConnection(String hostname, int port, Map <String , Object > options) throws IOException { 93 this(new InetSocketAddress (hostname, port), options, null, false); 94 } 95 96 97 104 public BlockingConnection(InetAddress address, int port) throws IOException { 105 this(new InetSocketAddress (address, port), new HashMap <String , Object >(), null, false); 106 } 107 108 117 public BlockingConnection(InetAddress address, int port, StreamSocketConfiguration socketConf) throws IOException { 118 this(new InetSocketAddress (address, port), socketConf.toOptions(), null, false); 119 } 120 121 122 131 public BlockingConnection(InetAddress address, int port, Map <String , Object > options) throws IOException { 132 this(new InetSocketAddress (address, port), options, null, false); 133 } 134 135 136 137 138 147 public BlockingConnection(InetAddress address, int port, SSLContext sslContext, boolean sslOn) throws IOException { 148 this(new InetSocketAddress (address, port), new HashMap <String , Object >(), sslContext, sslOn); 149 } 150 151 152 153 164 public BlockingConnection(InetAddress address, int port, StreamSocketConfiguration socketConf, SSLContext sslContext, boolean sslOn) throws IOException { 165 this(new InetSocketAddress (address, port), socketConf.toOptions(), sslContext, sslOn); 166 } 167 168 178 public BlockingConnection(InetAddress address, int port, Map <String , Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 179 this(new InetSocketAddress (address, port), options, sslContext, sslOn); 180 } 181 182 191 public BlockingConnection(String hostname, int port, SSLContext sslContext, boolean sslOn) throws IOException { 192 this(new InetSocketAddress (hostname, port), new HashMap <String , Object >(), sslContext, sslOn); 193 } 194 195 196 207 public BlockingConnection(String hostname, int port, StreamSocketConfiguration socketConf, SSLContext sslContext, boolean sslOn) throws IOException { 208 this(new InetSocketAddress (hostname, port), socketConf.toOptions(), sslContext, sslOn); 209 } 210 211 212 213 223 public BlockingConnection(String hostname, int port, Map <String , Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 224 this(new InetSocketAddress (hostname, port), options, sslContext, sslOn); 225 } 226 227 228 229 230 231 234 private BlockingConnection(InetSocketAddress remoteAddress, Map <String , Object > options, SSLContext sslContext, boolean sslOn) throws IOException { 235 super(new IoHandlerContext(null, null), remoteAddress, options, sslContext, sslOn); 236 237 setReceiveTimeoutMillis(INITIAL_RECEIVE_TIMEOUT); 238 setFlushmode(FlushMode.SYNC); 239 240 241 init(); 242 } 243 244 245 246 @Override 247 void reset() throws IOException { 248 readGuard = new Object (); 249 250 super.reset(); 251 252 setReceiveTimeoutMillis(IBlockingConnection.INITIAL_RECEIVE_TIMEOUT); 253 setFlushmode(FlushMode.SYNC); 254 } 255 256 257 258 262 public byte readByte() throws IOException ,ClosedConnectionException, SocketTimeoutException { 263 264 long start = System.currentTimeMillis(); 265 long remainingTime = receiveTimeout; 266 267 synchronized (readGuard) { 268 do { 269 try { 270 return extractByteFromReadQueue(); 271 } catch (BufferUnderflowException bue) { 272 276 try { 277 readGuard.wait(remainingTime); 278 } catch (InterruptedException ignore) { } 279 } 280 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 281 } while (remainingTime > 0); 282 } 283 284 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 285 } 286 287 288 291 public final void setReceiveTimeoutMillis(long timeout) { 292 this.receiveTimeout = timeout; 293 } 294 295 296 299 public ByteBuffer [] readByteBufferByDelimiter(String delimiter) throws IOException , ClosedConnectionException, SocketTimeoutException { 300 return readByteBufferByDelimiter(delimiter, Integer.MAX_VALUE); 301 } 302 303 304 305 308 public ByteBuffer [] readByteBufferByDelimiter(String delimiter, int maxLength) throws IOException , ClosedConnectionException, SocketTimeoutException , MaxReadSizeExceededException { 309 return readByteBufferByDelimiter(delimiter, getDefaultEncoding(), maxLength); 310 } 311 312 313 316 public ByteBuffer [] readByteBufferByDelimiter(String delimiter, String encoding, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException { 317 318 long start = System.currentTimeMillis(); 319 long remainingTime = receiveTimeout; 320 321 synchronized (readGuard) { 322 do { 323 try { 324 LinkedList <ByteBuffer > result = extractBytesByDelimiterFromReadQueue(delimiter.getBytes(encoding), maxLength); 325 return result.toArray(new ByteBuffer [result.size()]); 326 } catch (MaxReadSizeExceededException mee) { 327 throw mee; 328 329 } catch (BufferUnderflowException bue) { 330 331 try { 332 readGuard.wait(remainingTime); 333 } catch (InterruptedException ignore) { } 334 } 335 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 336 } while (remainingTime > 0); 337 } 338 339 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 340 } 341 342 343 346 public ByteBuffer [] readByteBufferByLength(int length) throws IOException , ClosedConnectionException, SocketTimeoutException { 347 348 if (length <= 0) { 349 return null; 350 } 351 352 long start = System.currentTimeMillis(); 353 long remainingTime = receiveTimeout; 354 355 synchronized (readGuard) { 356 do { 357 try { 358 LinkedList <ByteBuffer > result = extractBytesByLength(length); 359 return result.toArray(new ByteBuffer [result.size()]); 360 } catch (BufferUnderflowException bue) { 361 try { 362 readGuard.wait(remainingTime); 363 } catch (InterruptedException ignore) { } 364 } 365 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 366 } while (remainingTime > 0); 367 } 368 369 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 370 } 371 372 373 376 public byte[] readBytesByDelimiter(String delimiter) throws IOException ,ClosedConnectionException ,SocketTimeoutException { 377 return readBytesByDelimiter(delimiter, Integer.MAX_VALUE); 378 } 379 380 381 382 385 public byte[] readBytesByDelimiter(String delimiter, int maxLength) throws IOException , ClosedConnectionException, SocketTimeoutException , MaxReadSizeExceededException { 386 return readBytesByDelimiter(delimiter, getDefaultEncoding(), maxLength); 387 } 388 389 390 393 public byte[] readBytesByDelimiter(String delimiter, String encoding, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException { 394 return DataConverter.toBytes(readByteBufferByDelimiter(delimiter, encoding, maxLength)); 395 } 396 397 398 401 public byte[] readBytesByLength(int length) throws IOException , ClosedConnectionException, SocketTimeoutException { 402 return DataConverter.toBytes(readByteBufferByLength(length)); 403 } 404 405 408 public double readDouble() throws IOException , ClosedConnectionException, SocketTimeoutException { 409 long start = System.currentTimeMillis(); 410 long remainingTime = receiveTimeout; 411 412 synchronized (readGuard) { 413 do { 414 try { 415 return extractDoubleFromReadQueue(); 416 } catch (BufferUnderflowException bue) { 417 try { 418 readGuard.wait(remainingTime); 419 } catch (InterruptedException ignore) { } 420 } 421 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 422 } while (remainingTime> 0); 423 } 424 425 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 426 427 } 428 429 432 public int readInt() throws IOException , ClosedConnectionException, SocketTimeoutException { 433 long start = System.currentTimeMillis(); 434 long remainingTime = receiveTimeout; 435 436 synchronized (readGuard) { 437 do { 438 try { 439 return extractIntFromReadQueue(); 440 } catch (BufferUnderflowException bue) { 441 try { 442 readGuard.wait(remainingTime); 443 } catch (InterruptedException ignore) { } 444 } 445 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 446 } while (remainingTime > 0 ); 447 } 448 449 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 450 451 } 452 453 454 455 456 459 public short readShort() throws IOException , ClosedConnectionException, SocketTimeoutException { 460 long start = System.currentTimeMillis(); 461 long remainingTime = receiveTimeout; 462 463 synchronized (readGuard) { 464 do { 465 try { 466 return extractShortFromReadQueue(); 467 } catch (BufferUnderflowException bue) { 468 try { 469 readGuard.wait(remainingTime); 470 } catch (InterruptedException ignore) { } 471 } 472 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 473 } while (remainingTime > 0 ); 474 } 475 476 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 477 478 } 479 480 483 public final int read(ByteBuffer buffer) throws IOException { 484 int size = buffer.remaining(); 485 if (size < 1) { 486 return 0; 487 } 488 489 long start = System.currentTimeMillis(); 490 long remainingTime = receiveTimeout; 491 492 synchronized (readGuard) { 493 do { 494 int availableSize = getReadQueue().getSize(); 495 496 if (availableSize > 0) { 498 if (size > availableSize) { 499 size = availableSize; 500 } 501 ByteBuffer [] bufs = readByteBufferByLength(size); 502 503 for (ByteBuffer buf : bufs) { 504 buffer.put(buf); 505 } 506 507 return size; 508 509 }else { 511 try { 512 readGuard.wait(remainingTime); 513 } catch (InterruptedException ignore) { } 514 } 515 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 516 } while (remainingTime > 0); 517 } 518 519 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 520 } 521 522 523 526 public long readLong() throws IOException , ClosedConnectionException, SocketTimeoutException { 527 long start = System.currentTimeMillis(); 528 long remainingTime = receiveTimeout; 529 530 synchronized (readGuard) { 531 do { 532 try { 533 return extractLongFromReadQueue(); 534 } catch (BufferUnderflowException bue) { 535 try { 536 readGuard.wait(remainingTime); 537 } catch (InterruptedException ignore) { } 538 } 539 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 540 } while (remainingTime > 0); 541 } 542 543 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 544 } 545 546 547 548 551 public String readStringByDelimiter(String delimiter) throws IOException , ClosedConnectionException, UnsupportedEncodingException , SocketTimeoutException { 552 return readStringByDelimiter(delimiter, Integer.MAX_VALUE); 553 } 554 555 556 559 public String readStringByDelimiter(String delimiter, int maxLength) throws IOException ,ClosedConnectionException ,java.io.UnsupportedEncodingException ,SocketTimeoutException ,MaxReadSizeExceededException { 560 return readStringByDelimiter(delimiter, getDefaultEncoding(), maxLength); 561 }; 562 563 564 567 public String readStringByDelimiter(String delimiter, String encoding) throws IOException , ClosedConnectionException, UnsupportedEncodingException , SocketTimeoutException { 568 return readStringByDelimiter(delimiter, encoding, Integer.MAX_VALUE); 569 } 570 571 574 public String readStringByDelimiter(String delimiter, String encoding, int maxLength) throws IOException , ClosedConnectionException, UnsupportedEncodingException , SocketTimeoutException , MaxReadSizeExceededException { 575 long start = System.currentTimeMillis(); 576 long remainingTime = receiveTimeout; 577 578 synchronized (readGuard) { 579 do { 580 try { 581 LinkedList <ByteBuffer > extracted = extractBytesByDelimiterFromReadQueue(delimiter.getBytes(encoding), maxLength); 582 return DataConverter.toString(extracted, encoding); 583 } catch (MaxReadSizeExceededException mle) { 584 throw mle; 585 586 } catch (BufferUnderflowException bue) { 587 try { 588 readGuard.wait(remainingTime); 589 } catch (InterruptedException ignore) { } 590 } 591 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 592 } while (remainingTime > 0); 593 } 594 595 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 596 } 597 598 599 602 public int getIndexOf(String str) throws IOException ,ClosedConnectionException, SocketTimeoutException { 603 return getIndexOf(str, Integer.MAX_VALUE); 604 } 605 606 609 public int getIndexOf(String str, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException, SocketTimeoutException { 610 return getIndexOf(str, getDefaultEncoding(), maxLength); 611 } 612 613 614 617 public int getIndexOf(String str, String encoding, int maxLength) throws IOException , ClosedConnectionException, MaxReadSizeExceededException { 618 long start = System.currentTimeMillis(); 619 long remainingTime = receiveTimeout; 620 621 synchronized (readGuard) { 622 do { 623 try { 624 return readIndexOf(str.getBytes(encoding), maxLength); 625 } catch (MaxReadSizeExceededException mle) { 626 throw mle; 627 628 } catch (BufferUnderflowException bue) { 629 try { 630 readGuard.wait(remainingTime); 631 } catch (InterruptedException ignore) { } 632 } 633 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 634 } while (remainingTime > 0); 635 } 636 637 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 638 } 639 640 641 644 public String readStringByLength(int length) throws IOException , ClosedConnectionException, SocketTimeoutException { 645 return readStringByLength(length, getDefaultEncoding()); 646 } 647 648 649 652 public String readStringByLength(int length, String encoding) throws IOException , ClosedConnectionException, SocketTimeoutException { 653 654 if (length <= 0) { 655 return null; 656 } 657 658 long start = System.currentTimeMillis(); 659 long remainingTime = receiveTimeout; 660 661 synchronized (readGuard) { 662 do { 663 try { 664 LinkedList <ByteBuffer > extracted = extractBytesByLength(length); 665 return DataConverter.toString(extracted, encoding); 666 } catch (BufferUnderflowException bue) { 667 try { 668 readGuard.wait(remainingTime); 669 } catch (InterruptedException ignore) { } 670 } 671 remainingTime = (start + receiveTimeout) - System.currentTimeMillis(); 672 } while (remainingTime > 0); 673 } 674 675 throw new SocketTimeoutException ("timeout " + DataConverter.toFormatedDuration(receiveTimeout) + " reached"); 676 } 677 678 679 public IBlockingConnection setOption(String name, Object value) throws IOException { 680 return (IBlockingConnection) super.setOption(name, value); 681 } 682 683 @Override 684 protected int onDataEvent() { 685 686 int addSize = super.onDataEvent(); 688 689 if (addSize > 0) { 690 synchronized (readGuard) { 691 readGuard.notify(); 692 } 693 } 694 695 return addSize; 696 } 697 } 698 | Popular Tags |