1 20 package org.apache.mina.handler.support; 21 22 import java.io.IOException ; 23 import java.io.InputStream ; 24 25 import org.apache.mina.common.ByteBuffer; 26 import org.apache.mina.common.IoHandler; 27 import org.apache.mina.common.IoSession; 28 29 36 public class IoSessionInputStream extends InputStream { 37 private final Object mutex = new Object (); 38 39 private final ByteBuffer buf; 40 41 private volatile boolean closed; 42 43 private volatile boolean released; 44 45 private IOException exception; 46 47 public IoSessionInputStream() { 48 buf = ByteBuffer.allocate(16); 49 buf.setAutoExpand(true); 50 buf.limit(0); 51 } 52 53 public int available() { 54 if (released) { 55 return 0; 56 } else { 57 synchronized (mutex) { 58 return buf.remaining(); 59 } 60 } 61 } 62 63 public void close() { 64 if (closed) { 65 return; 66 } 67 68 synchronized (mutex) { 69 closed = true; 70 releaseBuffer(); 71 72 mutex.notifyAll(); 73 } 74 } 75 76 public int read() throws IOException { 77 synchronized (mutex) { 78 if (!waitForData()) { 79 return -1; 80 } 81 82 return buf.get() & 0xff; 83 } 84 } 85 86 public int read(byte[] b, int off, int len) throws IOException { 87 synchronized (mutex) { 88 if (!waitForData()) { 89 return -1; 90 } 91 92 int readBytes; 93 94 if (len > buf.remaining()) { 95 readBytes = buf.remaining(); 96 } else { 97 readBytes = len; 98 } 99 100 buf.get(b, off, readBytes); 101 102 return readBytes; 103 } 104 } 105 106 private boolean waitForData() throws IOException { 107 if (released) { 108 return false; 109 } 110 111 synchronized (mutex) { 112 while (!released && buf.remaining() == 0 && exception == null) { 113 try { 114 mutex.wait(); 115 } catch (InterruptedException e) { 116 IOException ioe = new IOException ( 117 "Interrupted while waiting for more data"); 118 ioe.initCause(e); 119 throw ioe; 120 } 121 } 122 } 123 124 if (exception != null) { 125 releaseBuffer(); 126 throw exception; 127 } 128 129 if (closed && buf.remaining() == 0) { 130 releaseBuffer(); 131 132 return false; 133 } 134 135 return true; 136 } 137 138 private void releaseBuffer() { 139 if (released) { 140 return; 141 } 142 143 released = true; 144 buf.release(); 145 } 146 147 public void write(ByteBuffer src) { 148 synchronized (mutex) { 149 if (closed) { 150 return; 151 } 152 153 if (buf.hasRemaining()) { 154 this.buf.compact(); 155 this.buf.put(src); 156 this.buf.flip(); 157 } else { 158 this.buf.clear(); 159 this.buf.put(src); 160 this.buf.flip(); 161 mutex.notifyAll(); 162 } 163 } 164 } 165 166 public void throwException(IOException e) { 167 synchronized (mutex) { 168 if (exception == null) { 169 exception = e; 170 171 mutex.notifyAll(); 172 } 173 } 174 } 175 } | Popular Tags |