1 7 package com.sun.corba.se.impl.encoding; 8 9 import java.nio.ByteBuffer ; 10 import com.sun.corba.se.pept.transport.ByteBufferPool; 11 import com.sun.corba.se.spi.logging.CORBALogDomains; 12 import com.sun.corba.se.spi.orb.ORB; 13 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 14 import com.sun.corba.se.impl.orbutil.ORBUtility; 15 import com.sun.corba.se.impl.protocol.RequestCanceledException; 16 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage; 17 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; 18 import java.util.*; 19 20 public class BufferManagerReadStream 21 implements BufferManagerRead, MarkAndResetHandler 22 { 23 private boolean receivedCancel = false; 24 private int cancelReqId = 0; 25 26 private boolean endOfStream = true; 28 private BufferQueue fragmentQueue = new BufferQueue(); 29 30 private ORB orb ; 35 private ORBUtilSystemException wrapper ; 36 private boolean debug = false; 37 38 BufferManagerReadStream( ORB orb ) 39 { 40 this.orb = orb ; 41 this.wrapper = ORBUtilSystemException.get( orb, 42 CORBALogDomains.RPC_ENCODING ) ; 43 debug = orb.transportDebugFlag; 44 } 45 46 public void cancelProcessing(int requestId) { 47 synchronized(fragmentQueue) { 48 receivedCancel = true; 49 cancelReqId = requestId; 50 fragmentQueue.notify(); 51 } 52 } 53 54 public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg) 55 { 56 ByteBufferWithInfo bbwi = 57 new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength()); 58 59 synchronized (fragmentQueue) { 60 if (debug) 61 { 62 int bbAddress = System.identityHashCode(byteBuffer); 64 StringBuffer sb = new StringBuffer (80); 65 sb.append("processFragment() - queueing ByteBuffer id ("); 66 sb.append(bbAddress).append(") to fragment queue."); 67 String strMsg = sb.toString(); 68 dprint(strMsg); 69 } 70 fragmentQueue.enqueue(bbwi); 71 endOfStream = !msg.moreFragmentsToFollow(); 72 fragmentQueue.notify(); 73 } 74 } 75 76 public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi) 77 { 78 79 ByteBufferWithInfo result = null; 80 81 try { 82 84 synchronized (fragmentQueue) { 85 86 if (receivedCancel) { 87 throw new RequestCanceledException(cancelReqId); 88 } 89 90 while (fragmentQueue.size() == 0) { 91 92 if (endOfStream) { 93 throw wrapper.endOfStream() ; 94 } 95 96 try { 97 fragmentQueue.wait(); 98 } catch (InterruptedException e) {} 99 100 if (receivedCancel) { 101 throw new RequestCanceledException(cancelReqId); 102 } 103 } 104 105 result = fragmentQueue.dequeue(); 106 result.fragmented = true; 107 108 if (debug) 109 { 110 int bbAddr = System.identityHashCode(result.byteBuffer); 112 StringBuffer sb1 = new StringBuffer (80); 113 sb1.append("underflow() - dequeued ByteBuffer id ("); 114 sb1.append(bbAddr).append(") from fragment queue."); 115 String msg1 = sb1.toString(); 116 dprint(msg1); 117 } 118 119 if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null) 123 { 124 ByteBufferPool byteBufferPool = getByteBufferPool(); 125 126 if (debug) 127 { 128 int bbAddress = System.identityHashCode(bbwi.byteBuffer); 130 StringBuffer sb = new StringBuffer (80); 131 sb.append("underflow() - releasing ByteBuffer id ("); 132 sb.append(bbAddress).append(") to ByteBufferPool."); 133 String msg = sb.toString(); 134 dprint(msg); 135 } 136 137 byteBufferPool.releaseByteBuffer(bbwi.byteBuffer); 138 bbwi.byteBuffer = null; 139 bbwi = null; 140 } 141 } 142 return result; 143 } finally { 144 } 146 } 147 148 public void init(Message msg) { 149 if (msg != null) 150 endOfStream = !msg.moreFragmentsToFollow(); 151 } 152 153 public void close(ByteBufferWithInfo bbwi) 156 { 157 int inputBbAddress = 0; 158 159 if (fragmentQueue != null) 161 { 162 synchronized (fragmentQueue) 163 { 164 if (bbwi != null) 172 { 173 inputBbAddress = System.identityHashCode(bbwi.byteBuffer); 174 } 175 176 ByteBufferWithInfo abbwi = null; 177 ByteBufferPool byteBufferPool = getByteBufferPool(); 178 while (fragmentQueue.size() != 0) 179 { 180 abbwi = fragmentQueue.dequeue(); 181 if (abbwi != null && abbwi.byteBuffer != null) 182 { 183 int bbAddress = System.identityHashCode(abbwi.byteBuffer); 184 if (inputBbAddress != bbAddress) 185 { 186 if (debug) 187 { 188 StringBuffer sb = new StringBuffer (80); 190 sb.append("close() - fragmentQueue is ") 191 .append("releasing ByteBuffer id (") 192 .append(bbAddress).append(") to ") 193 .append("ByteBufferPool."); 194 String msg = sb.toString(); 195 dprint(msg); 196 } 197 } 198 byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); 199 } 200 } 201 } 202 fragmentQueue = null; 203 } 204 205 if (fragmentStack != null && fragmentStack.size() != 0) 207 { 208 if (bbwi != null) 216 { 217 inputBbAddress = System.identityHashCode(bbwi.byteBuffer); 218 } 219 220 ByteBufferWithInfo abbwi = null; 221 ByteBufferPool byteBufferPool = getByteBufferPool(); 222 ListIterator itr = fragmentStack.listIterator(); 223 while (itr.hasNext()) 224 { 225 abbwi = (ByteBufferWithInfo)itr.next(); 226 227 if (abbwi != null && abbwi.byteBuffer != null) 228 { 229 int bbAddress = System.identityHashCode(abbwi.byteBuffer); 230 if (inputBbAddress != bbAddress) 231 { 232 if (debug) 233 { 234 StringBuffer sb = new StringBuffer (80); 236 sb.append("close() - fragmentStack - releasing ") 237 .append("ByteBuffer id (" + bbAddress + ") to ") 238 .append("ByteBufferPool."); 239 String msg = sb.toString(); 240 dprint(msg); 241 } 242 byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); 243 } 244 } 245 } 246 fragmentStack = null; 247 } 248 249 } 250 251 protected ByteBufferPool getByteBufferPool() 252 { 253 return orb.getByteBufferPool(); 254 } 255 256 private void dprint(String msg) 257 { 258 ORBUtility.dprint("BufferManagerReadStream", msg); 259 } 260 261 263 private boolean markEngaged = false; 264 265 private LinkedList fragmentStack = null; 268 private RestorableInputStream inputStream = null; 269 270 private Object streamMemento = null; 272 273 public void mark(RestorableInputStream inputStream) 274 { 275 this.inputStream = inputStream; 276 markEngaged = true; 277 278 streamMemento = inputStream.createStreamMemento(); 281 282 if (fragmentStack != null) { 283 fragmentStack.clear(); 284 } 285 } 286 287 public void fragmentationOccured(ByteBufferWithInfo newFragment) 289 { 290 if (!markEngaged) 291 return; 292 293 if (fragmentStack == null) 294 fragmentStack = new LinkedList(); 295 296 fragmentStack.addFirst(new ByteBufferWithInfo(newFragment)); 297 } 298 299 public void reset() 300 { 301 if (!markEngaged) { 302 return; 304 } 305 306 markEngaged = false; 307 308 if (fragmentStack != null && fragmentStack.size() != 0) { 312 ListIterator iter = fragmentStack.listIterator(); 313 314 synchronized(fragmentQueue) { 315 while (iter.hasNext()) { 316 fragmentQueue.push((ByteBufferWithInfo)iter.next()); 317 } 318 } 319 320 fragmentStack.clear(); 321 } 322 323 inputStream.restoreInternalState(streamMemento); 326 } 327 328 public MarkAndResetHandler getMarkAndResetHandler() { 329 return this; 330 } 331 } 332 | Popular Tags |