KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > corba > se > impl > encoding > BufferManagerReadStream


1 /*
2  * @(#)BufferManagerReadStream.java 1.13 03/12/19
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7 package com.sun.corba.se.impl.encoding;
8
9 import java.nio.ByteBuffer JavaDoc;
10 import com.sun.corba.se.pept.transport.ByteBufferPool;
11 import com.sun.corba.se.spi.logging.CORBALogDomains;
12 import com.sun.corba.se.spi.orb.ORB;
13 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
14 import com.sun.corba.se.impl.orbutil.ORBUtility;
15 import com.sun.corba.se.impl.protocol.RequestCanceledException;
16 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
17 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
18 import java.util.*;
19
20 public class BufferManagerReadStream
21     implements BufferManagerRead, MarkAndResetHandler
22 {
23     private boolean receivedCancel = false;
24     private int cancelReqId = 0;
25
26     // We should convert endOfStream to a final static dummy end node
27
private boolean endOfStream = true;
28     private BufferQueue fragmentQueue = new BufferQueue();
29
30     // REVISIT - This should go in BufferManagerRead. But, since
31
// BufferManagerRead is an interface. BufferManagerRead
32
// might ought to be an abstract class instead of an
33
// interface.
34
private ORB orb ;
35     private ORBUtilSystemException wrapper ;
36     private boolean debug = false;
37
38     BufferManagerReadStream( ORB orb )
39     {
40     this.orb = orb ;
41     this.wrapper = ORBUtilSystemException.get( orb,
42         CORBALogDomains.RPC_ENCODING ) ;
43         debug = orb.transportDebugFlag;
44     }
45
46     public void cancelProcessing(int requestId) {
47         synchronized(fragmentQueue) {
48             receivedCancel = true;
49             cancelReqId = requestId;
50             fragmentQueue.notify();
51         }
52     }
53
54     public void processFragment(ByteBuffer JavaDoc byteBuffer, FragmentMessage msg)
55     {
56         ByteBufferWithInfo bbwi =
57             new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());
58
59         synchronized (fragmentQueue) {
60             if (debug)
61             {
62                 // print address of ByteBuffer being queued
63
int bbAddress = System.identityHashCode(byteBuffer);
64                 StringBuffer JavaDoc sb = new StringBuffer JavaDoc(80);
65                 sb.append("processFragment() - queueing ByteBuffer id (");
66                 sb.append(bbAddress).append(") to fragment queue.");
67                 String JavaDoc strMsg = sb.toString();
68                 dprint(strMsg);
69             }
70             fragmentQueue.enqueue(bbwi);
71             endOfStream = !msg.moreFragmentsToFollow();
72             fragmentQueue.notify();
73         }
74     }
75  
76     public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
77     {
78
79       ByteBufferWithInfo result = null;
80
81       try {
82       //System.out.println("ENTER underflow");
83

84         synchronized (fragmentQueue) {
85
86             if (receivedCancel) {
87                 throw new RequestCanceledException(cancelReqId);
88             }
89
90             while (fragmentQueue.size() == 0) {
91
92                 if (endOfStream) {
93             throw wrapper.endOfStream() ;
94                 }
95
96                 try {
97                     fragmentQueue.wait();
98                 } catch (InterruptedException JavaDoc e) {}
99
100                 if (receivedCancel) {
101                     throw new RequestCanceledException(cancelReqId);
102                 }
103             }
104
105             result = fragmentQueue.dequeue();
106             result.fragmented = true;
107
108             if (debug)
109             {
110                 // print address of ByteBuffer being dequeued
111
int bbAddr = System.identityHashCode(result.byteBuffer);
112                 StringBuffer JavaDoc sb1 = new StringBuffer JavaDoc(80);
113                 sb1.append("underflow() - dequeued ByteBuffer id (");
114                 sb1.append(bbAddr).append(") from fragment queue.");
115                 String JavaDoc msg1 = sb1.toString();
116                 dprint(msg1);
117             }
118
119             // VERY IMPORTANT
120
// Release bbwi.byteBuffer to the ByteBufferPool only if
121
// this BufferManagerStream is not marked for potential restore.
122
if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
123             {
124                 ByteBufferPool byteBufferPool = getByteBufferPool();
125
126                 if (debug)
127                 {
128                     // print address of ByteBuffer being released
129
int bbAddress = System.identityHashCode(bbwi.byteBuffer);
130                     StringBuffer JavaDoc sb = new StringBuffer JavaDoc(80);
131                     sb.append("underflow() - releasing ByteBuffer id (");
132                     sb.append(bbAddress).append(") to ByteBufferPool.");
133                     String JavaDoc msg = sb.toString();
134                     dprint(msg);
135                 }
136
137                 byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
138                 bbwi.byteBuffer = null;
139                 bbwi = null;
140             }
141         }
142         return result;
143       } finally {
144       //System.out.println("EXIT underflow");
145
}
146     }
147
148     public void init(Message msg) {
149         if (msg != null)
150             endOfStream = !msg.moreFragmentsToFollow();
151     }
152
153     // Release any queued ByteBufferWithInfo's byteBuffers to the
154
// ByteBufferPoool
155
public void close(ByteBufferWithInfo bbwi)
156     {
157         int inputBbAddress = 0;
158
159         // release ByteBuffers on fragmentQueue
160
if (fragmentQueue != null)
161         {
162             synchronized (fragmentQueue)
163             {
164                 // IMPORTANT: The fragment queue may have one ByteBuffer
165
// on it that's also on the CDRInputStream if
166
// this method is called when the stream is 'marked'.
167
// Thus, we'll compare the ByteBuffer passed
168
// in (from a CDRInputStream) with all ByteBuffers
169
// on the stack. If one is found to equal, it will
170
// not be released to the ByteBufferPool.
171
if (bbwi != null)
172                 {
173                     inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
174                 }
175
176                 ByteBufferWithInfo abbwi = null;
177                 ByteBufferPool byteBufferPool = getByteBufferPool();
178                 while (fragmentQueue.size() != 0)
179                 {
180                     abbwi = fragmentQueue.dequeue();
181                     if (abbwi != null && abbwi.byteBuffer != null)
182                     {
183                         int bbAddress = System.identityHashCode(abbwi.byteBuffer);
184                         if (inputBbAddress != bbAddress)
185                         {
186                             if (debug)
187                             {
188                                  // print address of ByteBuffer released
189
StringBuffer JavaDoc sb = new StringBuffer JavaDoc(80);
190                                  sb.append("close() - fragmentQueue is ")
191                                    .append("releasing ByteBuffer id (")
192                                    .append(bbAddress).append(") to ")
193                                    .append("ByteBufferPool.");
194                                  String JavaDoc msg = sb.toString();
195                                  dprint(msg);
196                             }
197                         }
198                         byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
199                     }
200                 }
201             }
202             fragmentQueue = null;
203         }
204
205         // release ByteBuffers on fragmentStack
206
if (fragmentStack != null && fragmentStack.size() != 0)
207         {
208             // IMPORTANT: The fragment stack may have one ByteBuffer
209
// on it that's also on the CDRInputStream if
210
// this method is called when the stream is 'marked'.
211
// Thus, we'll compare the ByteBuffer passed
212
// in (from a CDRInputStream) with all ByteBuffers
213
// on the stack. If one is found to equal, it will
214
// not be released to the ByteBufferPool.
215
if (bbwi != null)
216             {
217                 inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
218             }
219
220             ByteBufferWithInfo abbwi = null;
221             ByteBufferPool byteBufferPool = getByteBufferPool();
222             ListIterator itr = fragmentStack.listIterator();
223             while (itr.hasNext())
224             {
225                 abbwi = (ByteBufferWithInfo)itr.next();
226
227                 if (abbwi != null && abbwi.byteBuffer != null)
228                 {
229                    int bbAddress = System.identityHashCode(abbwi.byteBuffer);
230                    if (inputBbAddress != bbAddress)
231                    {
232                        if (debug)
233                        {
234                             // print address of ByteBuffer being released
235
StringBuffer JavaDoc sb = new StringBuffer JavaDoc(80);
236                             sb.append("close() - fragmentStack - releasing ")
237                               .append("ByteBuffer id (" + bbAddress + ") to ")
238                               .append("ByteBufferPool.");
239                             String JavaDoc msg = sb.toString();
240                             dprint(msg);
241                        }
242                        byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
243                    }
244                 }
245             }
246             fragmentStack = null;
247         }
248
249     }
250
251     protected ByteBufferPool getByteBufferPool()
252     {
253         return orb.getByteBufferPool();
254     }
255
256     private void dprint(String JavaDoc msg)
257     {
258         ORBUtility.dprint("BufferManagerReadStream", msg);
259     }
260
261     // Mark and reset handler ----------------------------------------
262

263     private boolean markEngaged = false;
264
265     // List of fragment ByteBufferWithInfos received since
266
// the mark was engaged.
267
private LinkedList fragmentStack = null;
268     private RestorableInputStream inputStream = null;
269
270     // Original state of the stream
271
private Object JavaDoc streamMemento = null;
272
273     public void mark(RestorableInputStream inputStream)
274     {
275         this.inputStream = inputStream;
276         markEngaged = true;
277
278         // Get the magic Object that the stream will use to
279
// reconstruct it's state when reset is called
280
streamMemento = inputStream.createStreamMemento();
281
282         if (fragmentStack != null) {
283             fragmentStack.clear();
284         }
285     }
286
287     // Collects fragments received since the mark was engaged.
288
public void fragmentationOccured(ByteBufferWithInfo newFragment)
289     {
290         if (!markEngaged)
291             return;
292
293         if (fragmentStack == null)
294             fragmentStack = new LinkedList();
295
296         fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
297     }
298
299     public void reset()
300     {
301         if (!markEngaged) {
302             // REVISIT - call to reset without call to mark
303
return;
304         }
305
306         markEngaged = false;
307
308         // If we actually did peek across fragments, we need
309
// to push those fragments onto the front of the
310
// buffer queue.
311
if (fragmentStack != null && fragmentStack.size() != 0) {
312             ListIterator iter = fragmentStack.listIterator();
313
314             synchronized(fragmentQueue) {
315                 while (iter.hasNext()) {
316                     fragmentQueue.push((ByteBufferWithInfo)iter.next());
317                 }
318             }
319
320             fragmentStack.clear();
321         }
322
323         // Give the stream the magic Object to restore
324
// it's state.
325
inputStream.restoreInternalState(streamMemento);
326     }
327
328     public MarkAndResetHandler getMarkAndResetHandler() {
329         return this;
330     }
331 }
332
Popular Tags