KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > netscape > ldap > LDAPConnThread


1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2  *
3  * The contents of this file are subject to the Netscape Public
4  * License Version 1.1 (the "License"); you may not use this file
5  * except in compliance with the License. You may obtain a copy of
6  * the License at http://www.mozilla.org/NPL/
7  *
8  * Software distributed under the License is distributed on an "AS
9  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
10  * implied. See the License for the specific language governing
11  * rights and limitations under the License.
12  *
13  * The Original Code is mozilla.org code.
14  *
15  * The Initial Developer of the Original Code is Netscape
16  * Communications Corporation. Portions created by Netscape are
17  * Copyright (C) 1999 Netscape Communications Corporation. All
18  * Rights Reserved.
19  *
20  * Contributor(s):
21  */

22 package netscape.ldap;
23
24 import java.util.*;
25 import netscape.ldap.client.*;
26 import netscape.ldap.client.opers.*;
27 import netscape.ldap.ber.stream.*;
28 import netscape.ldap.util.*;
29 import java.io.*;
30 import java.net.*;
31 import java.text.SimpleDateFormat JavaDoc;
32
33 /**
34  * Multiple LDAPConnection clones can share a single physical connection,
35  * which is maintained by a thread.
36  *
37  * +----------------+
38  * | LDAPConnection | --------+
39  * +----------------+ |
40  * |
41  * +----------------+ | +----------------+
42  * | LDAPConnection | --------+------- | LDAPConnThread |
43  * +----------------+ | +----------------+
44  * |
45  * +----------------+ |
46  * | LDAPConnection | --------+
47  * +----------------+
48  *
49  * All LDAPConnections send requests and get responses from
50  * LDAPConnThread (a thread).
51  */

52 class LDAPConnThread extends Thread JavaDoc {
53
54     /**
55      * Constants
56      */

57     private final static int MAXMSGID = Integer.MAX_VALUE;
58     private final static int BACKLOG_CHKCNT = 50;
59
60     /**
61      * Internal variables
62      */

63     transient private int m_highMsgId;
64     transient private InputStream m_serverInput;
65     transient private OutputStream m_serverOutput;
66     transient private Hashtable m_requests;
67     transient private Hashtable m_messages = null;
68     transient private Vector m_registered;
69     transient private boolean m_disconnected = false;
70     transient private LDAPCache m_cache = null;
71     transient private boolean m_doRun = true;
72     private Socket m_socket = null;
73     transient private Thread JavaDoc m_thread = null;
74     transient Object JavaDoc m_sendRequestLock = new Object JavaDoc();
75     transient LDAPConnSetupMgr m_connMgr = null;
76     transient Object JavaDoc m_traceOutput = null;
77     transient private int m_backlogCheckCounter = BACKLOG_CHKCNT;
78
79
80     // Time Stemp format Hour(0-23):Minute:Second.Milliseconds used for trace msgs
81
static SimpleDateFormat JavaDoc m_timeFormat = new SimpleDateFormat JavaDoc("HH:mm:ss.SSS");
82         
83     /**
84      * Constructs a connection thread that maintains connection to the
85      * LDAP server.
86      * @param host LDAP host name
87      * @param port LDAP port number
88      * @param factory LDAP socket factory
89      */

90     public LDAPConnThread(LDAPConnSetupMgr connMgr, LDAPCache cache, Object JavaDoc traceOutput)
91         throws LDAPException {
92         super("LDAPConnThread " + connMgr.getHost() +":"+ connMgr.getPort());
93         m_requests = new Hashtable ();
94         m_registered = new Vector ();
95         m_connMgr = connMgr;
96         m_socket = connMgr.getSocket();
97         setCache( cache );
98         setTraceOutput(traceOutput);
99        
100         setDaemon(true);
101         
102         try {
103
104             m_serverInput = new BufferedInputStream (m_socket.getInputStream());
105             m_serverOutput = new BufferedOutputStream (m_socket.getOutputStream());
106
107         } catch (IOException e) {
108
109             // a kludge to make the thread go away. Since the thread has already
110
// been created, the only way to clean up the thread is to call the
111
// start() method. Otherwise, the exit method will be never called
112
// because the start() was never called. In the run method, the stop
113
// method calls right away if the m_doRun is set to false.
114
m_doRun = false;
115             start();
116             throw new LDAPException ( "failed to connect to server " +
117                   m_connMgr.getHost(), LDAPException.CONNECT_ERROR );
118         }
119
120         if (traceOutput != null) {
121             StringBuffer JavaDoc sb = new StringBuffer JavaDoc(" connected to ");
122             sb.append(m_connMgr.getLDAPUrl());
123             logTraceMessage(sb);
124         }
125         
126         start(); /* start the thread */
127     }
128
129     InputStream getInputStream() {
130         return m_serverInput;
131     }
132
133     void setInputStream( InputStream is ) {
134         m_serverInput = is;
135     }
136
137     OutputStream getOutputStream() {
138         return m_serverOutput;
139     }
140
141     void setOutputStream( OutputStream os ) {
142         m_serverOutput = os;
143     }
144
145     void setTraceOutput(Object JavaDoc traceOutput) {
146         synchronized (m_sendRequestLock) {
147             if (traceOutput == null) {
148                m_traceOutput = null;
149             }
150             else if (traceOutput instanceof OutputStream) {
151                 m_traceOutput = new PrintWriter((OutputStream)traceOutput);
152             }
153             else if (traceOutput instanceof LDAPTraceWriter) {
154                 m_traceOutput = traceOutput;
155             }
156         }
157     }
158     
159     void logTraceMessage(StringBuffer JavaDoc msg) {
160
161         String JavaDoc timeStamp = m_timeFormat.format(new Date());
162         StringBuffer JavaDoc sb = new StringBuffer JavaDoc(timeStamp);
163         sb.append(" ldc=");
164         sb.append(m_connMgr.getID());
165
166         synchronized( m_sendRequestLock ) {
167             if (m_traceOutput instanceof PrintWriter) {
168                 PrintWriter traceOutput = (PrintWriter)m_traceOutput;
169                 traceOutput.print(sb); // header
170
traceOutput.println(msg);
171                 traceOutput.flush();
172             }
173             else if (m_traceOutput instanceof LDAPTraceWriter) {
174                 sb.append(msg);
175                 ((LDAPTraceWriter)m_traceOutput).write(sb.toString());
176             }
177         }
178     }
179     
180     /**
181      * Set the cache to use for searches.
182      * @param cache The cache to use for searches; <CODE>null</CODE> for no cache
183      */

184     synchronized void setCache( LDAPCache cache ) {
185         m_cache = cache;
186         m_messages = (m_cache != null) ? new Hashtable() : null;
187     }
188
189     /**
190      * Allocates a new LDAP message ID. These are arbitrary numbers used to
191      * correlate client requests with server responses.
192      * @return new unique msgId
193      */

194     private int allocateId () {
195         synchronized (m_sendRequestLock) {
196             m_highMsgId = (m_highMsgId + 1) % MAXMSGID;
197             return m_highMsgId;
198         }
199     }
200
201     /**
202      * Sends LDAP request via this connection thread.
203      * @param request request to send
204      * @param toNotify response listener to invoke when the response
205      * is ready
206      */

207     void sendRequest (LDAPConnection conn, JDAPProtocolOp request,
208         LDAPMessageQueue toNotify, LDAPConstraints cons)
209          throws LDAPException {
210         if (!m_doRun) {
211             throw new LDAPException ( "not connected to a server",
212                                   LDAPException.SERVER_DOWN );
213         }
214         LDAPMessage msg =
215             new LDAPMessage(allocateId(), request, cons.getServerControls());
216
217         if ( toNotify != null ) {
218             if (!(request instanceof JDAPAbandonRequest ||
219                   request instanceof JDAPUnbindRequest)) {
220                 /* Only worry about toNotify if we expect a response... */
221                 this.m_requests.put (new Integer JavaDoc (msg.getMessageID()), toNotify);
222                 /* Notify the backlog checker that there may be another outstanding
223                    request */

224                 resultRetrieved();
225             }
226             toNotify.addRequest(msg.getMessageID(), conn, this, cons.getTimeLimit());
227         }
228
229         synchronized( m_sendRequestLock ) {
230             try {
231                 if (m_traceOutput != null) {
232                     logTraceMessage(msg.toTraceString());
233                 }
234                 msg.write (m_serverOutput);
235                 m_serverOutput.flush ();
236             } catch (IOException e) {
237                 networkError(e);
238             }
239         }
240     }
241
242     /**
243      * Register with this connection thread.
244      * @param conn LDAP connection
245      */

246     public synchronized void register(LDAPConnection conn) {
247         if (!m_registered.contains(conn))
248             m_registered.addElement(conn);
249     }
250
251     int getClientCount() {
252         return m_registered.size();
253     }
254
255     boolean isRunning() {
256         return m_doRun;
257     }
258
259     /**
260      * De-Register with this connection thread. If all the connection
261      * is deregistered. Then, this thread should be killed.
262      * @param conn LDAP connection
263      */

264     public synchronized void deregister(LDAPConnection conn) {
265         m_registered.removeElement(conn);
266         if (m_registered.size() == 0) {
267             try {
268
269                 if (!m_disconnected) {
270                     LDAPSearchConstraints cons = conn.getSearchConstraints();
271                     sendRequest (null, new JDAPUnbindRequest (), null, cons);
272                 }
273                 
274                 // must be set after the call to sendRequest
275
m_doRun =false;
276                 
277                 if ( m_thread != null && m_thread != Thread.currentThread()) {
278                     
279                     m_thread.interrupt();
280                     
281                     // Wait up to 1 sec for thread to accept disconnect
282
// notification. When the interrupt is accepted,
283
// m_thread is set to null. See run() method.
284
try {
285                         wait(1000);
286                     }
287                     catch (InterruptedException JavaDoc e) {
288                     }
289                 }
290                 
291             } catch (Exception JavaDoc e) {
292                 LDAPConnection.printDebug(e.toString());
293             }
294             finally {
295                 cleanUp();
296             }
297         }
298     }
299
300     /**
301      * Clean ups before shutdown the thread.
302      */

303     private void cleanUp() {
304         if (!m_disconnected) {
305             try {
306                 m_serverOutput.close ();
307             } catch (Exception JavaDoc e) {
308             } finally {
309                 m_serverOutput = null;
310             }
311
312             try {
313                 m_serverInput.close ();
314             } catch (Exception JavaDoc e) {
315             } finally {
316                 m_serverInput = null;
317             }
318
319             try {
320                 m_socket.close ();
321             } catch (Exception JavaDoc e) {
322             } finally {
323                 m_socket = null;
324             }
325         
326             m_disconnected = true;
327
328             /**
329              * Notify the Connection Setup Manager that the connection was
330              * terminated by the user
331              */

332             m_connMgr.disconnect();
333         
334             /**
335              * Cancel all outstanding requests
336              */

337             if (m_requests != null) {
338                 Enumeration requests = m_requests.elements();
339                 while (requests.hasMoreElements()) {
340                     LDAPMessageQueue listener =
341                         (LDAPMessageQueue)requests.nextElement();
342                     listener.removeAllRequests(this);
343                 }
344             }
345
346             /**
347              * Notify all the registered about this bad moment.
348              * IMPORTANT: This needs to be done at last. Otherwise, the socket
349              * input stream and output stream might never get closed and the whole
350              * task will get stuck in the stop method when we tried to stop the
351              * LDAPConnThread.
352              */

353
354             if (m_registered != null) {
355                 Vector registerCopy = (Vector)m_registered.clone();
356
357                 Enumeration cancelled = registerCopy.elements();
358
359                 while (cancelled.hasMoreElements ()) {
360                     LDAPConnection c = (LDAPConnection)cancelled.nextElement();
361                     c.deregisterConnection();
362                 }
363             }
364             m_registered.clear();
365             m_requests.clear();
366             m_messages = null;
367             m_cache = null;
368         }
369     }
370
371     /**
372      * Sleep if there is a backlog of search results
373      */

374     private void checkBacklog() throws InterruptedException JavaDoc{
375
376         while (true) {
377
378             if (m_requests.size() == 0) {
379                 return;
380             }
381             
382             Enumeration listeners = m_requests.elements();
383             while( listeners.hasMoreElements() ) {
384                 LDAPMessageQueue l = (LDAPMessageQueue)listeners.nextElement();
385
386                 // If there are any threads waiting for a regular response
387
// message, we have to go read the next incoming message
388
if ( !(l instanceof LDAPSearchListener) ) {
389                     return;
390                 }
391
392                 LDAPSearchListener sl = (LDAPSearchListener)l;
393                 
394                 // should never happen, but just in case
395
if (sl.getSearchConstraints() == null) {
396                     return;
397                 }
398
399                 int slMaxBacklog = sl.getSearchConstraints().getMaxBacklog();
400                 int slBatchSize = sl.getSearchConstraints().getBatchSize();
401                 
402                 // Disabled backlog check ?
403
if (slMaxBacklog == 0) {
404                     return;
405                 }
406                 
407                 // Synch op with zero batch size ?
408
if (!sl.isAsynchOp() && slBatchSize == 0) {
409                     return;
410                 }
411                 
412                 // Max backlog not reached for at least one listener ?
413
// (if multiple requests are in progress)
414
if (sl.getMessageCount() < slMaxBacklog) {
415                     return;
416                 }
417             }
418                                    
419             synchronized(this) {
420                 wait(3000);
421             }
422         }
423     }
424
425
426
427     /**
428      * This is called when a search result has been retrieved from the incoming
429      * queue. We use the notification to unblock the listener thread, if it
430      * is waiting for the backlog to lighten.
431      */

432     synchronized void resultRetrieved() {
433         notifyAll();
434     }
435
436     /**
437      * Reads from the LDAP server input stream for incoming LDAP messages.
438      */

439     public void run() {
440         
441         // if there is a problem of establishing connection to the server,
442
// stop the thread right away...
443
if (!m_doRun) {
444             return;
445         }
446
447         m_thread = Thread.currentThread();
448         LDAPMessage msg = null;
449         JDAPBERTagDecoder decoder = new JDAPBERTagDecoder();
450
451         while (m_doRun) {
452             yield();
453             int[] nread = new int[1];
454             nread[0] = 0;
455             try {
456
457                 // Check after every BACKLOG_CHKCNT messages if the backlog is not too high
458
if (--m_backlogCheckCounter <= 0) {
459                     m_backlogCheckCounter = BACKLOG_CHKCNT;
460                     checkBacklog();
461                 }
462
463                 BERElement element = BERElement.getElement(decoder,
464                                                            m_serverInput,
465                                                            nread);
466                 msg = LDAPMessage.parseMessage(element);
467
468                 if (m_traceOutput != null) {
469                     logTraceMessage(msg.toTraceString());
470                 }
471
472                 // passed in the ber element size to approximate the size of the cache
473
// entry, thereby avoiding serialization of the entry stored in the
474
// cache
475
processResponse (msg, nread[0]);
476
477             } catch (Exception JavaDoc e) {
478                 if (m_doRun) {
479                     networkError(e);
480                 }
481                 else {
482                     // interrupted from deregister()
483
synchronized (this) {
484                         m_thread = null;
485                         notifyAll();
486                     }
487                 }
488             }
489         }
490     }
491
492     /**
493      * When a response arrives from the LDAP server, it is processed by
494      * this routine. It will pass the message on to the listening object
495      * associated with the LDAP msgId.
496      * @param msg New message from LDAP server
497      */

498     private void processResponse (LDAPMessage msg, int size) {
499         Integer JavaDoc messageID = new Integer JavaDoc (msg.getMessageID());
500         LDAPMessageQueue l = (LDAPMessageQueue)m_requests.get (messageID);
501         if (l == null) {
502             return; /* nobody is waiting for this response (!) */
503         }
504
505         // For asynch operations controls are to be read from the LDAPMessage
506
// For synch operations controls are copied into the LDAPConnection
507
// For synch search operations, controls are also copied into
508
// LDAPSearchResults (see LDAPConnection.checkSearchMsg())
509
if ( ! l.isAsynchOp()) {
510             
511             /* Were there any controls for this client? */
512             LDAPControl[] con = msg.getControls();
513             if (con != null) {
514                 int msgid = msg.getMessageID();
515                 LDAPConnection ldc = l.getConnection(msgid);
516                 if (ldc != null) {
517                     ldc.setResponseControls( this,
518                         new LDAPResponseControl(ldc, msgid, con));
519                 }
520             }
521         }
522
523         if (m_cache != null && (l instanceof LDAPSearchListener)) {
524             cacheSearchResult((LDAPSearchListener)l, msg, size);
525         }
526         
527         l.addMessage (msg);
528
529         if (msg instanceof LDAPResponse) {
530             m_requests.remove (messageID);
531             if (m_requests.size() == 0) {
532                 m_backlogCheckCounter = BACKLOG_CHKCNT;
533             }
534         }
535     }
536
537     /**
538      * Collect search results to be added to the LDAPCache. Search results are
539      * packaged in a vector and temporary stored into a hashtable m_messages
540      * using the message id as the key. The vector first element (at index 0)
541      * is a Long integer representing the total size of all LDAPEntries entries.
542      * It is followed by the actual LDAPEntries.
543      * If the total size of entries exceeds the LDAPCache max size, or a referral
544      * has been received, caching of search results is disabled and the entry is
545      * not added to the LDAPCache. A disabled search request is denoted by setting
546      * the entry size to -1.
547      */

548     private synchronized void cacheSearchResult (LDAPSearchListener l, LDAPMessage msg, int size) {
549         Integer JavaDoc messageID = new Integer JavaDoc (msg.getMessageID());
550         Long JavaDoc key = l.getKey();
551         Vector v = null;
552
553         if ((m_cache == null) || (key == null)) {
554             return;
555         }
556         
557         if (msg instanceof LDAPSearchResult) {
558
559             // get the vector containing the LDAPMessages for the specified messageID
560
v = (Vector)m_messages.get(messageID);
561             if (v == null) {
562                 m_messages.put(messageID, v = new Vector());
563                 v.addElement(new Long JavaDoc(0));
564             }
565
566             // Return if the entry size is -1, i.e. the caching is disabled
567
if (((Long JavaDoc)v.firstElement()).longValue() == -1L) {
568                 return;
569             }
570             
571             // add the size of the current LDAPMessage to the lump sum
572
// assume the size of LDAPMessage is more or less the same as the size
573
// of LDAPEntry. Eventually LDAPEntry object gets stored in the cache
574
// instead of LDAPMessage object.
575
long entrySize = ((Long JavaDoc)v.firstElement()).longValue() + size;
576
577             // If the entrySize exceeds the cache size, discard the collected
578
// entries and disble collecting of entries for this search request
579
// by setting the entry size to -1.
580
if (entrySize > m_cache.getSize()) {
581                 v.removeAllElements();
582                 v.addElement(new Long JavaDoc(-1L));
583                 return;
584             }
585                 
586             // update the lump sum located in the first element of the vector
587
v.setElementAt(new Long JavaDoc(entrySize), 0);
588
589             // convert LDAPMessage object into LDAPEntry which is stored to the
590
// end of the Vector
591
v.addElement(((LDAPSearchResult)msg).getEntry());
592
593         } else if (msg instanceof LDAPSearchResultReference) {
594
595             // If a search reference is received disable caching of
596
// this search request
597
v = (Vector)m_messages.get(messageID);
598             if (v == null) {
599                 m_messages.put(messageID, v = new Vector());
600             }
601             else {
602                 v.removeAllElements();
603             }
604             v.addElement(new Long JavaDoc(-1L));
605
606         } else if (msg instanceof LDAPResponse) {
607
608             // The search request has completed. Store the cache entry
609
// in the LDAPCache if the operation has succeded and caching
610
// is not disabled due to the entry size or referrals
611

612             boolean fail = ((LDAPResponse)msg).getResultCode() > 0;
613             v = (Vector)m_messages.remove(messageID);
614             
615             if (!fail) {
616                 // If v is null, meaning there are no search results from the
617
// server
618
if (v == null) {
619                     v = new Vector();
620                     v.addElement(new Long JavaDoc(0));
621                 }
622
623                 // add the new entry if the entry size is not -1 (caching diabled)
624
if (((Long JavaDoc)v.firstElement()).longValue() != -1L) {
625                     m_cache.addEntry(key, v);
626                 }
627             }
628         }
629     }
630
631     /**
632      * Stop dispatching responses for a particular message ID.
633      * @param id Message ID for which to discard responses.
634      */

635     void abandon (int id ) {
636         
637         if (!m_doRun) {
638             return;
639         }
640         
641         LDAPMessageQueue l = (LDAPMessageQueue)m_requests.remove(new Integer JavaDoc(id));
642         // Clean up cache if enabled
643
if (m_messages != null) {
644             m_messages.remove(new Integer JavaDoc(id));
645         }
646         if (l != null) {
647             l.removeRequest(id);
648         }
649         resultRetrieved(); // If LDAPConnThread is blocked in checkBacklog()
650
}
651
652     /**
653      * Change listener for a message ID. Required when LDAPMessageQueue.merge()
654      * is invoked.
655      * @param id Message ID for which to chanage the listener.
656      * @return Previous listener.
657      */

658     LDAPMessageQueue changeListener (int id, LDAPMessageQueue toNotify) {
659
660         if (!m_doRun) {
661             toNotify.setException(this, new LDAPException("Server or network error",
662                                                            LDAPException.SERVER_DOWN));
663             return null;
664         }
665         return (LDAPMessageQueue) m_requests.put (new Integer JavaDoc (id), toNotify);
666     }
667
668     /**
669      * Handles network errors. Basically shuts down the whole connection.
670      * @param e The exception which was caught while trying to read from
671      * input stream.
672      */

673     private synchronized void networkError (Exception JavaDoc e) {
674
675         m_doRun =false;
676
677         // notify the Connection Setup Manager that the connection is lost
678
m_connMgr.invalidateConnection();
679
680         try {
681             
682             // notify each listener that the server is down.
683
Enumeration requests = m_requests.elements();
684             while (requests.hasMoreElements()) {
685                 LDAPMessageQueue listener =
686                     (LDAPMessageQueue)requests.nextElement();
687                 listener.setException(this, new LDAPException("Server or network error",
688                                                         LDAPException.SERVER_DOWN));
689             }
690
691         } catch (NullPointerException JavaDoc ee) {
692           System.err.println("Exception: "+ee.toString());
693         }
694
695         cleanUp();
696     }
697 }
698
Popular Tags