KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > vfs > PipeStream


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  * Free SoftwareFoundation, Inc.
23  * 59 Temple Place, Suite 330
24  * Boston, MA 02111-1307 USA
25  *
26  * @author Scott Ferguson
27  */

28
29 package com.caucho.vfs;
30
31 import java.io.IOException JavaDoc;
32 import java.io.InterruptedIOException JavaDoc;
33
34 /**
35  * Stream allowing two threads to read and write to each other.
36  */

37 public class PipeStream extends StreamImpl {
38   private PipeStream sibling;
39   private byte[] readBuffer;
40   private int readOffset;
41   private int readLength;
42
43   private PipeStream()
44   {
45     setPath(new NullPath("pipe"));
46     readBuffer = new byte[2 * TempBuffer.SIZE];
47     readOffset = 0;
48     readLength = 0;
49   }
50
51   /**
52    * Creates a pipe pair. The first object is a ReadStream, the second
53    * is a WriteStream.
54    */

55   public static Object JavaDoc []create()
56   {
57     PipeStream a = new PipeStream();
58     PipeStream b = new PipeStream();
59
60     a.sibling = b;
61     b.sibling = a;
62
63     return new Object JavaDoc[] { new ReadStream(a, null), new WriteStream(b) };
64   }
65
66   /**
67    * PipeStreams can read
68    */

69   public boolean canRead()
70   {
71     return true;
72   }
73
74   /**
75    * Reads the available bytes if any, otherwise block.
76    */

77   public int read(byte []buf, int offset, int length) throws IOException JavaDoc
78   {
79     if (readBuffer == null)
80       return 0;
81
82     synchronized (this) {
83       try {
84     if (readOffset >= readLength) {
85       // Sibling has closed
86
if (sibling.readBuffer == null)
87         return 0;
88
89       notifyAll();
90       wait();
91     }
92     
93     int sublen = readLength - readOffset;
94     if (sublen <= 0)
95       return 0;
96
97     if (length < sublen)
98       sublen = length;
99
100     System.arraycopy(readBuffer, readOffset, buf, offset, sublen);
101     readOffset += sublen;
102
103     return sublen;
104       } catch (InterruptedException JavaDoc e) {
105     throw new InterruptedIOException JavaDoc(e.getMessage());
106       }
107     }
108   }
109
110   /**
111    * Return the available bytes.
112    */

113   public int getAvailable() throws IOException JavaDoc
114   {
115     synchronized (this) {
116       return readLength - readOffset;
117     }
118   }
119
120   /**
121    * The pipe stream can write.
122    */

123   public boolean canWrite()
124   {
125     return true;
126   }
127
128   /**
129    * Implementation of the pipe write.
130    *
131    * @param buf byte buffer containing the bytes
132    * @param offset offset where to start writing
133    * @param length number of bytes to write
134    * @param isEnd true when the write is flushing a close.
135    */

136   public void write(byte []buf, int offset, int length, boolean isEnd)
137     throws IOException JavaDoc
138   {
139     while (length > 0) {
140       synchronized (sibling) {
141     if (sibling.readBuffer == null)
142       return;
143
144     if (sibling.readLength == sibling.readBuffer.length) {
145       if (sibling.readOffset < sibling.readLength) {
146         try {
147           sibling.wait();
148         } catch (InterruptedException JavaDoc e) {
149           throw new InterruptedIOException JavaDoc(e.getMessage());
150         }
151       }
152       sibling.readOffset = 0;
153       sibling.readLength = 0;
154     }
155
156         if (sibling.readOffset == sibling.readLength) {
157           sibling.readOffset = 0;
158           sibling.readLength = 0;
159         }
160
161     if (sibling.readBuffer == null)
162       return;
163
164     int sublen = sibling.readBuffer.length - sibling.readLength;
165     if (length < sublen)
166       sublen = length;
167
168     System.arraycopy(buf, offset,
169              sibling.readBuffer, sibling.readLength, sublen);
170
171     sibling.readLength += sublen;
172
173     length -= sublen;
174     offset += sublen;
175
176     sibling.notifyAll();
177       }
178     }
179   }
180
181   public void close() throws IOException JavaDoc
182   {
183     if (readBuffer == null)
184       return;
185
186     synchronized (this) {
187       readBuffer = null;
188       readLength = 0;
189       readOffset = 0;
190     
191       notifyAll();
192     }
193
194     synchronized (sibling) {
195       sibling.notifyAll();
196     }
197   }
198 }
199
Popular Tags