1 18 package org.apache.activemq.transport.nio; 19 20 import java.io.EOFException ; 21 import java.io.IOException ; 22 import java.io.InterruptedIOException ; 23 import java.io.OutputStream ; 24 import java.nio.ByteBuffer ; 25 import java.nio.channels.WritableByteChannel ; 26 27 32 33 public class NIOOutputStream extends OutputStream { 34 35 private final static int BUFFER_SIZE = 8192; 36 37 private final WritableByteChannel out; 38 private final byte[] buffer; 39 private final ByteBuffer byteBuffer; 40 41 private int count; 42 private boolean closed; 43 44 49 public NIOOutputStream(WritableByteChannel out) { 50 this(out, BUFFER_SIZE); 51 } 52 53 61 public NIOOutputStream(WritableByteChannel out, int size) { 62 this.out = out; 63 if (size <= 0) { 64 throw new IllegalArgumentException ("Buffer size <= 0"); 65 } 66 buffer = new byte[size]; 67 byteBuffer = ByteBuffer.wrap(buffer); 68 } 69 70 76 public void write(int b) throws IOException { 77 checkClosed(); 78 if (availableBufferToWrite() < 1) { 79 flush(); 80 } 81 buffer[count++] = (byte) b; 82 } 83 84 85 93 public void write(byte b[], int off, int len) throws IOException { 94 checkClosed(); 95 if (availableBufferToWrite() < len) { 96 flush(); 97 } 98 if (buffer.length >= len) { 99 System.arraycopy(b, off, buffer, count, len); 100 count += len; 101 } 102 else { 103 write( ByteBuffer.wrap(b, off, len)); 104 } 105 } 106 107 114 public void flush() throws IOException { 115 if (count > 0 && out != null) { 116 byteBuffer.position(0); 117 byteBuffer.limit(count); 118 write(byteBuffer); 119 count = 0; 120 } 121 } 122 123 128 public void close() throws IOException { 129 super.close(); 130 closed = true; 131 } 132 133 134 139 protected void checkClosed() throws IOException { 140 if (closed) { 141 throw new EOFException ("Cannot write to the stream any more it has already been closed"); 142 } 143 } 144 145 148 private int availableBufferToWrite() { 149 return buffer.length - count; 150 } 151 152 protected void write(ByteBuffer data) throws IOException { 153 int remaining = data.remaining(); 154 int lastRemaining = remaining-1; 155 long delay=1; 156 while( remaining > 0 ) { 157 158 if( remaining == lastRemaining ) { 161 try { 162 Thread.sleep(delay); 164 delay *= 2; 165 if( delay > 1000 ) { 166 delay = 1000; 167 } 168 } catch (InterruptedException e) { 169 throw new InterruptedIOException (); 170 } 171 } else { 172 delay = 1; 173 } 174 lastRemaining = remaining; 175 176 out.write( data ); 178 remaining = data.remaining(); 179 } 180 } 181 182 } 183 | Popular Tags |