1 55 56 package org.apache.bsf.debug.util; 57 58 import java.util.*; 59 import java.net.*; 60 import java.io.*; 61 62 import org.apache.bsf.debug.*; 63 64 public abstract class SocketConnection { 65 66 Vector m_rcells; Hashtable m_tcells; IntHashtable m_tcellsById; 69 70 private boolean keep_listening; 71 int fCmdIdGenerator; 72 73 IntHashtable m_skeletons; 74 protected StubTable fStubs; 75 76 protected InputStream fInputStream; 77 protected OutputStream fOutputStream; 78 79 protected DataInputStream fDataInputStream; 80 protected DataOutputStream fDataOutputStream; 81 82 protected SocketConnection() { 83 m_skeletons = new IntHashtable(); 84 fStubs = null; 85 m_rcells = new Vector(); 86 m_tcells = new Hashtable(); 87 m_tcellsById = new IntHashtable(); 88 89 if (ThreadCell.isServer) { 90 fCmdIdGenerator = 10000; 91 } else { 92 fCmdIdGenerator = 90000; 93 } 94 } 95 96 public void exportSkeleton(Skeleton skel) { 97 skel.allocOid(this); 98 m_skeletons.put(skel.getUid(), skel); 99 } 100 101 public Skeleton getSkeleton(int uid) { 102 return (Skeleton) m_skeletons.get(uid); 103 } 104 105 public Stub getStub(int tid, int uid) { 106 return (Stub) fStubs.swizzle(tid, uid); 107 } 108 109 public void listen() { 110 ResultCell cell = null; 111 int cmdId, thId, count; 112 int classId; 113 int methodId; 114 byte bytes[]; 115 boolean errorOccured, isResult; 116 117 setListening(true); 118 119 while (keep_listening) { 120 try { 121 count = fDataInputStream.readInt(); 122 thId = fDataInputStream.readInt(); 124 errorOccured = fDataInputStream.readBoolean(); 126 cmdId = fDataInputStream.readInt(); 128 isResult = fDataInputStream.readBoolean(); 130 132 if (count != 0) { 133 bytes = new byte[count]; 134 if (count != fInputStream.read(bytes)) 135 throw new Error ("Wire Protocol Error"); 136 } 137 else bytes = new byte[0]; 138 139 if (errorOccured) { 140 receivedException(thId,cmdId,bytes); 141 } 142 else { 143 if (isResult) { 144 receivedResult(thId,cmdId,bytes); 146 } 147 else { 148 receivedInvocation(thId,cmdId,bytes); 150 } 151 } 152 } 153 catch (InterruptedIOException iioe) { 154 continue; 156 } 157 catch (Exception ex) { 158 wireExceptionNotify(ex); 159 } 160 } 161 } 162 163 public void stopListening() { 164 Enumeration e; 165 ResultCell cell; 166 167 e = m_rcells.elements(); 168 while (e.hasMoreElements()) { 169 cell = (ResultCell) e.nextElement(); 170 try { 171 cell.disconnected = true; 172 cell.completionNotify(); 173 } catch (Throwable t) { 174 t.printStackTrace(); 175 } 176 } 177 fStubs.disconnectNotify(); 178 setListening(false); 179 } 180 181 private synchronized void setListening(boolean listen) { 182 keep_listening = listen; 183 } 184 185 private void receivedException(int thId, int cmdId, byte bytes[]) 186 throws IOException { 187 Exception ex; 188 ResultCell cell; 189 190 if (ThreadCell.isServer) 191 DebugLog.stdoutPrintln("Received error from CLIENT...", 192 DebugLog.BSF_LOG_L2); 193 else 194 DebugLog.stdoutPrintln("Received error from SERVER...", 195 DebugLog.BSF_LOG_L2); 196 197 DebugLog.stdoutPrintln(" **** ERROR: thId=" + thId + 198 ", cmdId=" + cmdId, 199 DebugLog.BSF_LOG_L2); 200 cell = searchCell(cmdId); 201 cell.setPacketBytes(bytes); 202 cell.readException(); 203 cell.print(); 204 205 cell.thread.completionNotify(cell); 208 209 } 210 211 private void receivedResult(int thId, int cmdId, byte bytes[]) { 212 ResultCell cell; 213 DebugLog.stdoutPrintln(" <<<< RESULT: thId=" + 214 thId + ", cmdId=" + cmdId, 215 DebugLog.BSF_LOG_L3); 216 if (bytes!=null) 217 DebugLog.stdoutPrintln(" byte count=" + 218 bytes.length, DebugLog.BSF_LOG_L3); 219 else 220 DebugLog.stdoutPrintln(" no bytes", 221 DebugLog.BSF_LOG_L3); 222 223 cell = searchCell(cmdId); 224 cell.setPacketBytes(bytes); 225 cell.parseResult(); 226 cell.completionNotify(); 228 } 229 230 private synchronized void receivedInvocation(int thId, int cmdId, 231 byte bytes[]) 232 throws Exception { 233 234 DebugLog.stdoutPrintln(" >>>> INVOCATION: thId=" + 235 thId + ", cmdId=" + cmdId, 236 DebugLog.BSF_LOG_L3); 237 if (bytes!=null) 238 DebugLog.stdoutPrintln(" byte count=" + 239 bytes.length, 240 DebugLog.BSF_LOG_L3); 241 else 242 DebugLog.stdoutPrintln(" no bytes", 243 DebugLog.BSF_LOG_L3); 244 245 ResultCell rcell = new ResultCell(this); 246 m_rcells.addElement(rcell); 247 248 rcell.incomingInvocation(cmdId, bytes); 249 250 invoke(rcell, thId); 251 } 252 253 259 void completionNotify(ResultCell rcell) { 260 try { 261 rcell.sendResult(); 262 } catch (Exception ex) { 263 DebugLog.stdoutPrintln("Exception Raised while sending result.", 264 DebugLog.BSF_LOG_L0); 265 ex.printStackTrace(); 266 } 267 } 268 269 280 private void invoke(ResultCell rcell, int thId) { 281 282 ThreadCell tcell = (ThreadCell) m_tcellsById.get(thId); 283 if (tcell == null) { 284 Thread thread; 289 tcell = new ThreadCell(this, thId); 290 thread = tcell.getThread(); 291 m_tcells.put(thread, tcell); 292 m_tcellsById.put(thId, tcell); 293 } 294 tcell.pushLoopback(rcell); 295 } 296 297 306 public synchronized ResultCell prepareOutgoingInvoke(Stub self, 307 int classId, 308 int methodId) 309 throws IOException { 310 311 ThreadCell tcell; 312 ResultCell rcell; 313 314 Thread thread; 315 thread = Thread.currentThread(); 316 tcell = (ThreadCell) m_tcells.get(thread); 317 if (tcell == null) { 318 tcell = new ThreadCell(this, thread); 319 m_tcells.put(thread, tcell); 320 m_tcellsById.put(tcell.getThId(), tcell); 321 } 322 323 rcell = new ResultCell(this); 324 325 int cmdId; 326 if (ThreadCell.isServer) 327 cmdId = ++fCmdIdGenerator; 328 else 329 cmdId = --fCmdIdGenerator; 330 331 rcell.outgoingInvocation(cmdId, classId, methodId, self); 332 m_rcells.addElement(rcell); 333 tcell.pushInvocation(rcell); 334 335 return rcell; 336 } 337 338 private boolean m_outStreamLocked = false; 339 private Object m_outStreamLock = new Object (); 340 341 void lockOutStream() { 342 synchronized (m_outStreamLock) { 343 while (m_outStreamLocked) { 344 try { 345 m_outStreamLock.wait(); 346 } catch (InterruptedException ex) { 347 348 } 349 } 350 m_outStreamLocked = true; 351 } 352 } 353 354 void releaseOutStream() { 355 synchronized (m_outStreamLock) { 356 m_outStreamLocked = false; 357 m_outStreamLock.notifyAll(); 358 } 359 } 360 361 void sendPacket(int thId, int cmdId, boolean isResult, 362 byte bytes[], boolean errorOccured) { 363 try { 364 synchronized (fDataOutputStream) { 365 fDataOutputStream.writeInt(bytes.length); 366 fDataOutputStream.writeInt(thId); 367 fDataOutputStream.writeBoolean(errorOccured); 368 fDataOutputStream.writeInt(cmdId); 369 fDataOutputStream.writeBoolean(isResult); 370 if (bytes.length != 0) { 371 fOutputStream.write(bytes); 372 } 373 } 374 } catch (Exception ex) { 375 DebugLog.stdoutPrintln("Exception during sending result...", 376 DebugLog.BSF_LOG_L0); 377 ex.printStackTrace(); 378 this.wireExceptionNotify(ex); 379 } 380 } 381 382 public synchronized ResultCell searchCell(int cmdId) { 383 ResultCell cell = null; 384 Enumeration e; 385 e = m_rcells.elements(); 386 while (e.hasMoreElements()) { 387 cell = (ResultCell) e.nextElement(); 388 if (cell.cmdId == cmdId) 389 break; 390 cell = null; 391 } 392 if (cell == null) 393 throw new Error ("Error in Wire Protocol, can't find CmdId=" + 394 cmdId); 395 return cell; 396 } 397 398 public Stub swizzle(int tid, int uid) { 399 return fStubs.swizzle(tid, uid); 400 } 401 402 protected abstract void dispatchInvocation(ResultCell rcell) 403 throws Exception ; 404 405 415 protected void wireExceptionNotify(Exception ex) { 416 DebugLog.stdoutPrintln("A WIRE exception occurred.", 417 DebugLog.BSF_LOG_L2); 418 DebugLog.stdoutPrintln(ex.toString(), 419 DebugLog.BSF_LOG_L2); 420 disconnectNotify(ex); 421 fStubs.disconnectNotify(); 422 setListening(false); 423 } 424 425 428 private synchronized void disconnectNotify(Exception ex) { 429 Enumeration e; 430 ResultCell cell; 431 DebugLog.stdoutPrintln("Raise the exception in all waiting threads...", 432 DebugLog.BSF_LOG_L2); 433 e = m_rcells.elements(); 434 while (e.hasMoreElements()) { 435 cell = (ResultCell) e.nextElement(); 436 try { 437 cell.setException(ex); 438 cell.disconnected = true; 439 cell.completionNotify(); 440 } catch (Throwable t) { 441 t.printStackTrace(); 442 } 443 } 444 m_rcells = new Vector(); 445 m_tcells = new Hashtable(); 446 DebugLog.stdoutPrintln("Done with raising exceptions...", 447 DebugLog.BSF_LOG_L2); 448 } 449 } 450 | Popular Tags |