1 28 29 package com.caucho.vfs; 30 31 import java.io.IOException ; 32 import java.io.InterruptedIOException ; 33 34 37 public class PipeStream extends StreamImpl { 38 private PipeStream sibling; 39 private byte[] readBuffer; 40 private int readOffset; 41 private int readLength; 42 43 private PipeStream() 44 { 45 setPath(new NullPath("pipe")); 46 readBuffer = new byte[2 * TempBuffer.SIZE]; 47 readOffset = 0; 48 readLength = 0; 49 } 50 51 55 public static Object []create() 56 { 57 PipeStream a = new PipeStream(); 58 PipeStream b = new PipeStream(); 59 60 a.sibling = b; 61 b.sibling = a; 62 63 return new Object [] { new ReadStream(a, null), new WriteStream(b) }; 64 } 65 66 69 public boolean canRead() 70 { 71 return true; 72 } 73 74 77 public int read(byte []buf, int offset, int length) throws IOException 78 { 79 if (readBuffer == null) 80 return 0; 81 82 synchronized (this) { 83 try { 84 if (readOffset >= readLength) { 85 if (sibling.readBuffer == null) 87 return 0; 88 89 notifyAll(); 90 wait(); 91 } 92 93 int sublen = readLength - readOffset; 94 if (sublen <= 0) 95 return 0; 96 97 if (length < sublen) 98 sublen = length; 99 100 System.arraycopy(readBuffer, readOffset, buf, offset, sublen); 101 readOffset += sublen; 102 103 return sublen; 104 } catch (InterruptedException e) { 105 throw new InterruptedIOException (e.getMessage()); 106 } 107 } 108 } 109 110 113 public int getAvailable() throws IOException 114 { 115 synchronized (this) { 116 return readLength - readOffset; 117 } 118 } 119 120 123 public boolean canWrite() 124 { 125 return true; 126 } 127 128 136 public void write(byte []buf, int offset, int length, boolean isEnd) 137 throws IOException 138 { 139 while (length > 0) { 140 synchronized (sibling) { 141 if (sibling.readBuffer == null) 142 return; 143 144 if (sibling.readLength == sibling.readBuffer.length) { 145 if (sibling.readOffset < sibling.readLength) { 146 try { 147 sibling.wait(); 148 } catch (InterruptedException e) { 149 throw new InterruptedIOException (e.getMessage()); 150 } 151 } 152 sibling.readOffset = 0; 153 sibling.readLength = 0; 154 } 155 156 if (sibling.readOffset == sibling.readLength) { 157 sibling.readOffset = 0; 158 sibling.readLength = 0; 159 } 160 161 if (sibling.readBuffer == null) 162 return; 163 164 int sublen = sibling.readBuffer.length - sibling.readLength; 165 if (length < sublen) 166 sublen = length; 167 168 System.arraycopy(buf, offset, 169 sibling.readBuffer, sibling.readLength, sublen); 170 171 sibling.readLength += sublen; 172 173 length -= sublen; 174 offset += sublen; 175 176 sibling.notifyAll(); 177 } 178 } 179 } 180 181 public void close() throws IOException 182 { 183 if (readBuffer == null) 184 return; 185 186 synchronized (this) { 187 readBuffer = null; 188 readLength = 0; 189 readOffset = 0; 190 191 notifyAll(); 192 } 193 194 synchronized (sibling) { 195 sibling.notifyAll(); 196 } 197 } 198 } 199 | Popular Tags |