1 22 package netscape.ldap; 23 24 import java.util.*; 25 import netscape.ldap.client.*; 26 import netscape.ldap.client.opers.*; 27 import netscape.ldap.ber.stream.*; 28 import netscape.ldap.util.*; 29 import java.io.*; 30 import java.net.*; 31 import java.text.SimpleDateFormat ; 32 33 52 class LDAPConnThread extends Thread { 53 54 57 private final static int MAXMSGID = Integer.MAX_VALUE; 58 private final static int BACKLOG_CHKCNT = 50; 59 60 63 transient private int m_highMsgId; 64 transient private InputStream m_serverInput; 65 transient private OutputStream m_serverOutput; 66 transient private Hashtable m_requests; 67 transient private Hashtable m_messages = null; 68 transient private Vector m_registered; 69 transient private boolean m_disconnected = false; 70 transient private LDAPCache m_cache = null; 71 transient private boolean m_doRun = true; 72 private Socket m_socket = null; 73 transient private Thread m_thread = null; 74 transient Object m_sendRequestLock = new Object (); 75 transient LDAPConnSetupMgr m_connMgr = null; 76 transient Object m_traceOutput = null; 77 transient private int m_backlogCheckCounter = BACKLOG_CHKCNT; 78 79 80 static SimpleDateFormat m_timeFormat = new SimpleDateFormat ("HH:mm:ss.SSS"); 82 83 90 public LDAPConnThread(LDAPConnSetupMgr connMgr, LDAPCache cache, Object traceOutput) 91 throws LDAPException { 92 super("LDAPConnThread " + connMgr.getHost() +":"+ connMgr.getPort()); 93 m_requests = new Hashtable (); 94 m_registered = new Vector (); 95 m_connMgr = connMgr; 96 m_socket = connMgr.getSocket(); 97 setCache( cache ); 98 setTraceOutput(traceOutput); 99 100 setDaemon(true); 101 102 try { 103 104 m_serverInput = new BufferedInputStream (m_socket.getInputStream()); 105 m_serverOutput = new BufferedOutputStream (m_socket.getOutputStream()); 106 107 } catch (IOException e) { 108 109 m_doRun = false; 115 start(); 116 throw new LDAPException ( "failed to connect to server " + 117 m_connMgr.getHost(), LDAPException.CONNECT_ERROR ); 118 } 119 120 if (traceOutput != null) { 121 StringBuffer sb = new StringBuffer (" connected to "); 122 sb.append(m_connMgr.getLDAPUrl()); 123 logTraceMessage(sb); 124 } 125 126 start(); 127 } 128 129 InputStream getInputStream() { 130 return m_serverInput; 131 } 132 133 void setInputStream( InputStream is ) { 134 m_serverInput = is; 135 } 136 137 OutputStream getOutputStream() { 138 return m_serverOutput; 139 } 140 141 void setOutputStream( OutputStream os ) { 142 m_serverOutput = os; 143 } 144 145 void setTraceOutput(Object traceOutput) { 146 synchronized (m_sendRequestLock) { 147 if (traceOutput == null) { 148 m_traceOutput = null; 149 } 150 else if (traceOutput instanceof OutputStream) { 151 m_traceOutput = new PrintWriter((OutputStream)traceOutput); 152 } 153 else if (traceOutput instanceof LDAPTraceWriter) { 154 m_traceOutput = traceOutput; 155 } 156 } 157 } 158 159 void logTraceMessage(StringBuffer msg) { 160 161 String timeStamp = m_timeFormat.format(new Date()); 162 StringBuffer sb = new StringBuffer (timeStamp); 163 sb.append(" ldc="); 164 sb.append(m_connMgr.getID()); 165 166 synchronized( m_sendRequestLock ) { 167 if (m_traceOutput instanceof PrintWriter) { 168 PrintWriter traceOutput = (PrintWriter)m_traceOutput; 169 traceOutput.print(sb); traceOutput.println(msg); 171 traceOutput.flush(); 172 } 173 else if (m_traceOutput instanceof LDAPTraceWriter) { 174 sb.append(msg); 175 ((LDAPTraceWriter)m_traceOutput).write(sb.toString()); 176 } 177 } 178 } 179 180 184 synchronized void setCache( LDAPCache cache ) { 185 m_cache = cache; 186 m_messages = (m_cache != null) ? new Hashtable() : null; 187 } 188 189 194 private int allocateId () { 195 synchronized (m_sendRequestLock) { 196 m_highMsgId = (m_highMsgId + 1) % MAXMSGID; 197 return m_highMsgId; 198 } 199 } 200 201 207 void sendRequest (LDAPConnection conn, JDAPProtocolOp request, 208 LDAPMessageQueue toNotify, LDAPConstraints cons) 209 throws LDAPException { 210 if (!m_doRun) { 211 throw new LDAPException ( "not connected to a server", 212 LDAPException.SERVER_DOWN ); 213 } 214 LDAPMessage msg = 215 new LDAPMessage(allocateId(), request, cons.getServerControls()); 216 217 if ( toNotify != null ) { 218 if (!(request instanceof JDAPAbandonRequest || 219 request instanceof JDAPUnbindRequest)) { 220 221 this.m_requests.put (new Integer (msg.getMessageID()), toNotify); 222 224 resultRetrieved(); 225 } 226 toNotify.addRequest(msg.getMessageID(), conn, this, cons.getTimeLimit()); 227 } 228 229 synchronized( m_sendRequestLock ) { 230 try { 231 if (m_traceOutput != null) { 232 logTraceMessage(msg.toTraceString()); 233 } 234 msg.write (m_serverOutput); 235 m_serverOutput.flush (); 236 } catch (IOException e) { 237 networkError(e); 238 } 239 } 240 } 241 242 246 public synchronized void register(LDAPConnection conn) { 247 if (!m_registered.contains(conn)) 248 m_registered.addElement(conn); 249 } 250 251 int getClientCount() { 252 return m_registered.size(); 253 } 254 255 boolean isRunning() { 256 return m_doRun; 257 } 258 259 264 public synchronized void deregister(LDAPConnection conn) { 265 m_registered.removeElement(conn); 266 if (m_registered.size() == 0) { 267 try { 268 269 if (!m_disconnected) { 270 LDAPSearchConstraints cons = conn.getSearchConstraints(); 271 sendRequest (null, new JDAPUnbindRequest (), null, cons); 272 } 273 274 m_doRun =false; 276 277 if ( m_thread != null && m_thread != Thread.currentThread()) { 278 279 m_thread.interrupt(); 280 281 try { 285 wait(1000); 286 } 287 catch (InterruptedException e) { 288 } 289 } 290 291 } catch (Exception e) { 292 LDAPConnection.printDebug(e.toString()); 293 } 294 finally { 295 cleanUp(); 296 } 297 } 298 } 299 300 303 private void cleanUp() { 304 if (!m_disconnected) { 305 try { 306 m_serverOutput.close (); 307 } catch (Exception e) { 308 } finally { 309 m_serverOutput = null; 310 } 311 312 try { 313 m_serverInput.close (); 314 } catch (Exception e) { 315 } finally { 316 m_serverInput = null; 317 } 318 319 try { 320 m_socket.close (); 321 } catch (Exception e) { 322 } finally { 323 m_socket = null; 324 } 325 326 m_disconnected = true; 327 328 332 m_connMgr.disconnect(); 333 334 337 if (m_requests != null) { 338 Enumeration requests = m_requests.elements(); 339 while (requests.hasMoreElements()) { 340 LDAPMessageQueue listener = 341 (LDAPMessageQueue)requests.nextElement(); 342 listener.removeAllRequests(this); 343 } 344 } 345 346 353 354 if (m_registered != null) { 355 Vector registerCopy = (Vector)m_registered.clone(); 356 357 Enumeration cancelled = registerCopy.elements(); 358 359 while (cancelled.hasMoreElements ()) { 360 LDAPConnection c = (LDAPConnection)cancelled.nextElement(); 361 c.deregisterConnection(); 362 } 363 } 364 m_registered.clear(); 365 m_requests.clear(); 366 m_messages = null; 367 m_cache = null; 368 } 369 } 370 371 374 private void checkBacklog() throws InterruptedException { 375 376 while (true) { 377 378 if (m_requests.size() == 0) { 379 return; 380 } 381 382 Enumeration listeners = m_requests.elements(); 383 while( listeners.hasMoreElements() ) { 384 LDAPMessageQueue l = (LDAPMessageQueue)listeners.nextElement(); 385 386 if ( !(l instanceof LDAPSearchListener) ) { 389 return; 390 } 391 392 LDAPSearchListener sl = (LDAPSearchListener)l; 393 394 if (sl.getSearchConstraints() == null) { 396 return; 397 } 398 399 int slMaxBacklog = sl.getSearchConstraints().getMaxBacklog(); 400 int slBatchSize = sl.getSearchConstraints().getBatchSize(); 401 402 if (slMaxBacklog == 0) { 404 return; 405 } 406 407 if (!sl.isAsynchOp() && slBatchSize == 0) { 409 return; 410 } 411 412 if (sl.getMessageCount() < slMaxBacklog) { 415 return; 416 } 417 } 418 419 synchronized(this) { 420 wait(3000); 421 } 422 } 423 } 424 425 426 427 432 synchronized void resultRetrieved() { 433 notifyAll(); 434 } 435 436 439 public void run() { 440 441 if (!m_doRun) { 444 return; 445 } 446 447 m_thread = Thread.currentThread(); 448 LDAPMessage msg = null; 449 JDAPBERTagDecoder decoder = new JDAPBERTagDecoder(); 450 451 while (m_doRun) { 452 yield(); 453 int[] nread = new int[1]; 454 nread[0] = 0; 455 try { 456 457 if (--m_backlogCheckCounter <= 0) { 459 m_backlogCheckCounter = BACKLOG_CHKCNT; 460 checkBacklog(); 461 } 462 463 BERElement element = BERElement.getElement(decoder, 464 m_serverInput, 465 nread); 466 msg = LDAPMessage.parseMessage(element); 467 468 if (m_traceOutput != null) { 469 logTraceMessage(msg.toTraceString()); 470 } 471 472 processResponse (msg, nread[0]); 476 477 } catch (Exception e) { 478 if (m_doRun) { 479 networkError(e); 480 } 481 else { 482 synchronized (this) { 484 m_thread = null; 485 notifyAll(); 486 } 487 } 488 } 489 } 490 } 491 492 498 private void processResponse (LDAPMessage msg, int size) { 499 Integer messageID = new Integer (msg.getMessageID()); 500 LDAPMessageQueue l = (LDAPMessageQueue)m_requests.get (messageID); 501 if (l == null) { 502 return; 503 } 504 505 if ( ! l.isAsynchOp()) { 510 511 512 LDAPControl[] con = msg.getControls(); 513 if (con != null) { 514 int msgid = msg.getMessageID(); 515 LDAPConnection ldc = l.getConnection(msgid); 516 if (ldc != null) { 517 ldc.setResponseControls( this, 518 new LDAPResponseControl(ldc, msgid, con)); 519 } 520 } 521 } 522 523 if (m_cache != null && (l instanceof LDAPSearchListener)) { 524 cacheSearchResult((LDAPSearchListener)l, msg, size); 525 } 526 527 l.addMessage (msg); 528 529 if (msg instanceof LDAPResponse) { 530 m_requests.remove (messageID); 531 if (m_requests.size() == 0) { 532 m_backlogCheckCounter = BACKLOG_CHKCNT; 533 } 534 } 535 } 536 537 548 private synchronized void cacheSearchResult (LDAPSearchListener l, LDAPMessage msg, int size) { 549 Integer messageID = new Integer (msg.getMessageID()); 550 Long key = l.getKey(); 551 Vector v = null; 552 553 if ((m_cache == null) || (key == null)) { 554 return; 555 } 556 557 if (msg instanceof LDAPSearchResult) { 558 559 v = (Vector)m_messages.get(messageID); 561 if (v == null) { 562 m_messages.put(messageID, v = new Vector()); 563 v.addElement(new Long (0)); 564 } 565 566 if (((Long )v.firstElement()).longValue() == -1L) { 568 return; 569 } 570 571 long entrySize = ((Long )v.firstElement()).longValue() + size; 576 577 if (entrySize > m_cache.getSize()) { 581 v.removeAllElements(); 582 v.addElement(new Long (-1L)); 583 return; 584 } 585 586 v.setElementAt(new Long (entrySize), 0); 588 589 v.addElement(((LDAPSearchResult)msg).getEntry()); 592 593 } else if (msg instanceof LDAPSearchResultReference) { 594 595 v = (Vector)m_messages.get(messageID); 598 if (v == null) { 599 m_messages.put(messageID, v = new Vector()); 600 } 601 else { 602 v.removeAllElements(); 603 } 604 v.addElement(new Long (-1L)); 605 606 } else if (msg instanceof LDAPResponse) { 607 608 612 boolean fail = ((LDAPResponse)msg).getResultCode() > 0; 613 v = (Vector)m_messages.remove(messageID); 614 615 if (!fail) { 616 if (v == null) { 619 v = new Vector(); 620 v.addElement(new Long (0)); 621 } 622 623 if (((Long )v.firstElement()).longValue() != -1L) { 625 m_cache.addEntry(key, v); 626 } 627 } 628 } 629 } 630 631 635 void abandon (int id ) { 636 637 if (!m_doRun) { 638 return; 639 } 640 641 LDAPMessageQueue l = (LDAPMessageQueue)m_requests.remove(new Integer (id)); 642 if (m_messages != null) { 644 m_messages.remove(new Integer (id)); 645 } 646 if (l != null) { 647 l.removeRequest(id); 648 } 649 resultRetrieved(); } 651 652 658 LDAPMessageQueue changeListener (int id, LDAPMessageQueue toNotify) { 659 660 if (!m_doRun) { 661 toNotify.setException(this, new LDAPException("Server or network error", 662 LDAPException.SERVER_DOWN)); 663 return null; 664 } 665 return (LDAPMessageQueue) m_requests.put (new Integer (id), toNotify); 666 } 667 668 673 private synchronized void networkError (Exception e) { 674 675 m_doRun =false; 676 677 m_connMgr.invalidateConnection(); 679 680 try { 681 682 Enumeration requests = m_requests.elements(); 684 while (requests.hasMoreElements()) { 685 LDAPMessageQueue listener = 686 (LDAPMessageQueue)requests.nextElement(); 687 listener.setException(this, new LDAPException("Server or network error", 688 LDAPException.SERVER_DOWN)); 689 } 690 691 } catch (NullPointerException ee) { 692 System.err.println("Exception: "+ee.toString()); 693 } 694 695 cleanUp(); 696 } 697 } 698 | Popular Tags |