1 18 package org.apache.activemq.transport.tcp; 19 20 import java.io.FilterInputStream ; 21 import java.io.IOException ; 22 import java.io.InputStream ; 23 28 public class TcpBufferedInputStream extends FilterInputStream { 29 private static final int DEFAULT_BUFFER_SIZE=8192; 30 protected byte internalBuffer[]; 31 protected int count; 32 protected int position; 33 34 public TcpBufferedInputStream(InputStream in){ 35 this(in,DEFAULT_BUFFER_SIZE); 36 } 37 38 public TcpBufferedInputStream(InputStream in,int size){ 39 super(in); 40 if(size<=0){ 41 throw new IllegalArgumentException ("Buffer size <= 0"); 42 } 43 internalBuffer=new byte[size]; 44 } 45 46 private void fill() throws IOException { 47 byte[] buffer=internalBuffer; 48 count=position=0; 49 int n=in.read(buffer,position,buffer.length-position); 50 if(n>0) 51 count=n+position; 52 } 53 54 public int read() throws IOException { 55 if(position>=count){ 56 fill(); 57 if(position>=count) 58 return -1; 59 } 60 return internalBuffer[position++]&0xff; 61 } 62 63 private int readStream(byte[] b,int off,int len) throws IOException { 64 int avail=count-position; 65 if(avail<=0){ 66 if(len>=internalBuffer.length){ 67 return in.read(b,off,len); 68 } 69 fill(); 70 avail=count-position; 71 if(avail<=0) 72 return -1; 73 } 74 int cnt=(avail<len)?avail:len; 75 System.arraycopy(internalBuffer,position,b,off,cnt); 76 position+=cnt; 77 return cnt; 78 } 79 80 public int read(byte b[],int off,int len) throws IOException { 81 if((off|len|(off+len)|(b.length-(off+len)))<0){ 82 throw new IndexOutOfBoundsException (); 83 }else if(len==0){ 84 return 0; 85 } 86 int n=0; 87 for(;;){ 88 int nread=readStream(b,off+n,len-n); 89 if(nread<=0) 90 return (n==0)?nread:n; 91 n+=nread; 92 if(n>=len) 93 return n; 94 InputStream input=in; 96 if(input!=null&&input.available()<=0) 97 return n; 98 } 99 } 100 101 public long skip(long n) throws IOException { 102 if(n<=0){ 103 return 0; 104 } 105 long avail=count-position; 106 if(avail<=0){ 107 return in.skip(n); 108 } 109 long skipped=(avail<n)?avail:n; 110 position+=skipped; 111 return skipped; 112 } 113 114 public int available() throws IOException { 115 return in.available()+(count-position); 116 } 117 118 public boolean markSupported(){ 119 return false; 120 } 121 122 public void close() throws IOException { 123 if(in!=null) 124 in.close(); 125 } 126 } 127 | Popular Tags |