KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > jmx > snmp > daemon > CommunicatorServer


1 /*
2  * @(#)file CommunicatorServer.java
3  * @(#)author Sun Microsystems, Inc.
4  * @(#)version 1.58
5  * @(#)lastedit 04/02/19
6  *
7  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
8  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
9  *
10  */

11
12
13 package com.sun.jmx.snmp.daemon;
14
15
16
17 // java import
18
//
19
import java.io.ObjectInputStream JavaDoc;
20 import java.io.IOException JavaDoc;
21 import java.net.InetAddress JavaDoc;
22 import java.util.Date JavaDoc;
23 import java.util.Vector JavaDoc;
24 import java.util.Enumeration JavaDoc;
25
26 // jmx import
27
//
28
import javax.management.MBeanServer JavaDoc;
29 import javax.management.MBeanRegistration JavaDoc;
30 import javax.management.ObjectName JavaDoc;
31 import javax.management.NotificationListener JavaDoc;
32 import javax.management.NotificationFilter JavaDoc;
33 import javax.management.NotificationBroadcaster JavaDoc;
34 import javax.management.NotificationBroadcasterSupport JavaDoc;
35 import javax.management.MBeanNotificationInfo JavaDoc;
36 import javax.management.AttributeChangeNotification JavaDoc;
37 import javax.management.ListenerNotFoundException JavaDoc;
38 import javax.management.loading.ClassLoaderRepository JavaDoc;
39 import javax.management.MBeanServerFactory JavaDoc;
40
41 // jmx RI import
42
//
43
import com.sun.jmx.trace.Trace;
44
45 // JSR 160 import
46
//
47
// XXX Revisit:
48
// used to import com.sun.jmx.snmp.MBeanServerForwarder
49
// Now using JSR 160 instead. => this is an additional
50
// dependency to JSR 160.
51
//
52
import javax.management.remote.MBeanServerForwarder JavaDoc;
53
54 /**
55  * Defines generic behavior for the server part of a connector or an adaptor.
56  * Most connectors or adaptors extend <CODE>CommunicatorServer</CODE>
57  * and inherit this behavior. Connectors or adaptors that do not fit into
58  * this model do not extend <CODE>CommunicatorServer</CODE>.
59  * <p>
60  * A <CODE>CommunicatorServer</CODE> is an active object, it listens for
61  * client requests and processes them in its own thread. When necessary, a
62  * <CODE>CommunicatorServer</CODE> creates other threads to process multiple
63  * requests concurrently.
64  * <p>
65  * A <CODE>CommunicatorServer</CODE> object can be stopped by calling the
66  * <CODE>stop</CODE> method. When it is stopped, the
67  * <CODE>CommunicatorServer</CODE> no longer listens to client requests and
68  * no longer holds any thread or communication resources.
69  * It can be started again by calling the <CODE>start</CODE> method.
70  * <p>
71  * A <CODE>CommunicatorServer</CODE> has a <CODE>State</CODE> attribute
72  * which reflects its activity.
73  * <p>
74  * <TABLE>
75  * <TR><TH>CommunicatorServer</TH> <TH>State</TH></TR>
76  * <TR><TD><CODE>stopped</CODE></TD> <TD><CODE>OFFLINE</CODE></TD></TR>
77  * <TR><TD><CODE>starting</CODE></TD> <TD><CODE>STARTING</CODE></TD></TR>
78  * <TR><TD><CODE>running</CODE></TD> <TD><CODE>ONLINE</CODE></TD></TR>
79  * <TR><TD><CODE>stopping</CODE></TD> <TD><CODE>STOPPING</CODE></TD></TR>
80  * </TABLE>
81  * <p>
82  * The <CODE>STARTING</CODE> state marks the transition
83  * from <CODE>OFFLINE</CODE> to <CODE>ONLINE</CODE>.
84  * <p>
85  * The <CODE>STOPPING</CODE> state marks the transition from
86  * <CODE>ONLINE</CODE> to <CODE>OFFLINE</CODE>. This occurs when the
87  * <CODE>CommunicatorServer</CODE> is finishing or interrupting active
88  * requests.
89  * <p>
90  * When a <CODE>CommunicatorServer</CODE> is unregistered from the MBeanServer,
91  * it is stopped automatically.
92  * <p>
93  * When the value of the <CODE>State</CODE> attribute changes the
94  * <CODE>CommunicatorServer</CODE> sends a
95  * <tt>{@link javax.management.AttributeChangeNotification}</tt> to the
96  * registered listeners, if any.
97  *
98  * <p><b>This API is a Sun Microsystems internal API and is subject
99  * to change without notice.</b></p>
100  * @version 1.58 02/19/04
101  * @author Sun Microsystems, Inc
102  */

103
104 public abstract class CommunicatorServer
105     implements Runnable JavaDoc, MBeanRegistration JavaDoc, NotificationBroadcaster JavaDoc,
106            CommunicatorServerMBean {
107
108     //
109
// States of a CommunicatorServer
110
//
111

112     /**
113      * Represents an <CODE>ONLINE</CODE> state.
114      */

115     public static final int ONLINE = 0 ;
116
117     /**
118      * Represents an <CODE>OFFLINE</CODE> state.
119      */

120     public static final int OFFLINE = 1 ;
121
122     /**
123      * Represents a <CODE>STOPPING</CODE> state.
124      */

125     public static final int STOPPING = 2 ;
126
127     /**
128      * Represents a <CODE>STARTING</CODE> state.
129      */

130     public static final int STARTING = 3 ;
131
132     //
133
// Types of connectors.
134
//
135

136     /**
137      * Indicates that it is an RMI connector type.
138      */

139     //public static final int RMI_TYPE = 1 ;
140

141     /**
142      * Indicates that it is an HTTP connector type.
143      */

144     //public static final int HTTP_TYPE = 2 ;
145

146     /**
147      * Indicates that it is an HTML connector type.
148      */

149     //public static final int HTML_TYPE = 3 ;
150

151     /**
152      * Indicates that it is an SNMP connector type.
153      */

154     public static final int SNMP_TYPE = 4 ;
155
156     /**
157      * Indicates that it is an HTTPS connector type.
158      */

159     //public static final int HTTPS_TYPE = 5 ;
160

161     //
162
// Package variables
163
//
164

165     /**
166      * The state of the connector server.
167      */

168      transient volatile int state = OFFLINE ;
169
170     /**
171      * The object name of the connector server.
172      * @serial
173      */

174     ObjectName JavaDoc objectName ;
175
176     MBeanServer JavaDoc topMBS;
177     MBeanServer JavaDoc bottomMBS;
178
179     /**
180      */

181     transient String JavaDoc dbgTag = null ;
182
183     /**
184      * The maximum number of clients that the CommunicatorServer can
185      * process concurrently.
186      * @serial
187      */

188     int maxActiveClientCount = 1 ;
189
190     /**
191      */

192     transient int servedClientCount = 0 ;
193
194     /**
195      * The host name used by this CommunicatorServer.
196      * @serial
197      */

198     String JavaDoc host = null ;
199
200     /**
201      * The port number used by this CommunicatorServer.
202      * @serial
203      */

204     int port = -1 ;
205
206
207     //
208
// Private fields
209
//
210

211     /* This object controls access to the "state" and "interrupted" variables.
212        If held at the same time as the lock on "this", the "this" lock must
213        be taken first. */

214     private transient Object JavaDoc stateLock = new Object JavaDoc();
215
216     private transient Vector JavaDoc clientHandlerVector = new Vector JavaDoc() ;
217
218     private transient Thread JavaDoc fatherThread = Thread.currentThread() ;
219     private transient Thread JavaDoc mainThread = null ;
220
221     private volatile boolean stopRequested = false ;
222     private boolean interrupted = false;
223     private transient Exception JavaDoc startException = null;
224
225     // Notifs count, broadcaster and info
226
private transient long notifCount = 0;
227     private transient NotificationBroadcasterSupport JavaDoc notifBroadcaster =
228     new NotificationBroadcasterSupport JavaDoc();
229     private transient MBeanNotificationInfo JavaDoc[] notifInfos = null;
230     
231
232     /**
233      * Instantiates a <CODE>CommunicatorServer</CODE>.
234      *
235      * @param connectorType Indicates the connector type. Possible values are:
236      * SNMP_TYPE.
237      *
238      * @exception <CODE>java.lang.IllegalArgumentException</CODE>
239      * This connector type is not correct.
240      */

241     public CommunicatorServer(int connectorType)
242     throws IllegalArgumentException JavaDoc {
243         switch (connectorType) {
244         case SNMP_TYPE :
245             infoType = Trace.INFO_ADAPTOR_SNMP ;
246             break;
247         default:
248             throw new IllegalArgumentException JavaDoc("Invalid connector Type") ;
249         }
250         dbgTag = makeDebugTag() ;
251     }
252
253     protected Thread JavaDoc createMainThread() {
254     return new Thread JavaDoc (this, makeThreadName());
255     }
256
257     /**
258      * Starts this <CODE>CommunicatorServer</CODE>.
259      * <p>
260      * Has no effect if this <CODE>CommunicatorServer</CODE> is
261      * <CODE>ONLINE</CODE> or <CODE>STOPPING</CODE>.
262      * @param timeout Time in ms to wait for the connector to start.
263      * If <code>timeout</code> is positive, wait for at most
264      * the specified time. An infinite timeout can be specified
265      * by passing a <code>timeout</code> value equals
266      * <code>Long.MAX_VALUE</code>. In that case the method
267      * will wait until the connector starts or fails to start.
268      * If timeout is negative or zero, returns as soon as possible
269      * without waiting.
270      * @exception CommunicationException if the connectors fails to start.
271      * @exception InterruptedException if the thread is interrupted or the
272      * timeout expires.
273      */

274     public void start(long timeout)
275     throws CommunicationException, InterruptedException JavaDoc {
276     boolean start;
277
278     synchronized (stateLock) {
279         if (state == STOPPING) {
280         // Fix for bug 4352451:
281
// "java.net.BindException: Address in use".
282
waitState(OFFLINE, 60000);
283         }
284         start = (state == OFFLINE);
285         if (start) {
286         changeState(STARTING);
287         stopRequested = false;
288         interrupted = false;
289         startException = null;
290         }
291     }
292
293     if (!start) {
294             if (isTraceOn())
295                 trace("start","Connector is not OFFLINE") ;
296         return;
297     }
298
299     if (isTraceOn())
300         trace("start","--> Start connector ") ;
301
302     mainThread = createMainThread();
303
304     mainThread.start() ;
305     
306     if (timeout > 0) waitForStart(timeout);
307     }
308     
309     /**
310      * Starts this <CODE>CommunicatorServer</CODE>.
311      * <p>
312      * Has no effect if this <CODE>CommunicatorServer</CODE> is
313      * <CODE>ONLINE</CODE> or <CODE>STOPPING</CODE>.
314      */

315     public void start() {
316     try {
317         start(0);
318     } catch (InterruptedException JavaDoc x) {
319         // can not happen because of `0'
320
trace("start","interrupted: " + x);
321     }
322     }
323     
324     /**
325      * Stops this <CODE>CommunicatorServer</CODE>.
326      * <p>
327      * Has no effect if this <CODE>CommunicatorServer</CODE> is
328      * <CODE>OFFLINE</CODE> or <CODE>STOPPING</CODE>.
329      */

330     public void stop() {
331     synchronized (stateLock) {
332         if (state == OFFLINE || state == STOPPING) {
333         if (isTraceOn())
334             trace("stop","Connector is not ONLINE") ;
335         return;
336         }
337         changeState(STOPPING);
338         //
339
// Stop the connector thread
340
//
341
if (isTraceOn())
342         trace("stop","Interrupt main thread") ;
343         stopRequested = true ;
344         if (!interrupted) {
345         interrupted = true;
346         mainThread.interrupt();
347         }
348     }
349
350     //
351
// Call terminate on each active client handler
352
//
353
if (isTraceOn()) {
354         trace("stop","terminateAllClient") ;
355     }
356     terminateAllClient() ;
357     
358     // ----------------------
359
// changeState
360
// ----------------------
361
synchronized (stateLock) {
362         if (state == STARTING)
363         changeState(OFFLINE);
364     }
365     }
366
367     /**
368      * Tests whether the <CODE>CommunicatorServer</CODE> is active.
369      *
370      * @return True if connector is <CODE>ONLINE</CODE>; false otherwise.
371      */

372     public boolean isActive() {
373     synchronized (stateLock) {
374         return (state == ONLINE);
375     }
376     }
377
378     /**
379      * <p>Waits until either the State attribute of this MBean equals the
380      * specified <VAR>wantedState</VAR> parameter,
381      * or the specified <VAR>timeOut</VAR> has elapsed.
382      * The method <CODE>waitState</CODE> returns with a boolean value
383      * indicating whether the specified <VAR>wantedState</VAR> parameter
384      * equals the value of this MBean's State attribute at the time the method
385      * terminates.</p>
386      *
387      * <p>Two special cases for the <VAR>timeOut</VAR> parameter value are:</p>
388      * <UL><LI> if <VAR>timeOut</VAR> is negative then <CODE>waitState</CODE>
389      * returns immediately (i.e. does not wait at all),</LI>
390      * <LI> if <VAR>timeOut</VAR> equals zero then <CODE>waitState</CODE>
391      * waits untill the value of this MBean's State attribute
392      * is the same as the <VAR>wantedState</VAR> parameter (i.e. will wait
393      * indefinitely if this condition is never met).</LI></UL>
394      *
395      * @param wantedState The value of this MBean's State attribute to wait
396      * for. <VAR>wantedState</VAR> can be one of:
397      * <ul>
398      * <li><CODE>CommunicatorServer.OFFLINE</CODE>,</li>
399      * <li><CODE>CommunicatorServer.ONLINE</CODE>,</li>
400      * <li><CODE>CommunicatorServer.STARTING</CODE>,</li>
401      * <li><CODE>CommunicatorServer.STOPPING</CODE>.</li>
402      * </ul>
403      * @param timeOut The maximum time to wait for, in milliseconds,
404      * if positive.
405      * Infinite time out if 0, or no waiting at all if negative.
406      *
407      * @return true if the value of this MBean's State attribute is the
408      * same as the <VAR>wantedState</VAR> parameter; false otherwise.
409      */

410     public boolean waitState(int wantedState, long timeOut) {
411         if (isTraceOn())
412             trace("waitState", wantedState + "(0on,1off,2st) TO=" + timeOut +
413           " ; current state = " + getStateString());
414
415     long endTime = 0;
416     if (timeOut > 0)
417         endTime = System.currentTimeMillis() + timeOut;
418
419     synchronized (stateLock) {
420         while (state != wantedState) {
421         if (timeOut < 0) {
422             if (isTraceOn())
423             trace("waitState", "timeOut < 0, return without wait");
424             return false;
425         } else {
426             try {
427             if (timeOut > 0) {
428                 long toWait = endTime - System.currentTimeMillis();
429                 if (toWait <= 0) {
430                 if (isTraceOn())
431                     trace("waitState", "timed out");
432                 return false;
433                 }
434                 stateLock.wait(toWait);
435             } else { // timeOut == 0
436
stateLock.wait();
437             }
438             } catch (InterruptedException JavaDoc e) {
439             if (isTraceOn())
440                 trace("waitState", "wait interrupted");
441             return (state == wantedState);
442             }
443         }
444         }
445         if (isTraceOn())
446         trace("waitState", "returning in desired state");
447         return true;
448     }
449     }
450
451     /**
452      * <p>Waits until the communicator is started or timeout expires.
453      *
454      * @param timeout Time in ms to wait for the connector to start.
455      * If <code>timeout</code> is positive, wait for at most
456      * the specified time. An infinite timeout can be specified
457      * by passing a <code>timeout</code> value equals
458      * <code>Long.MAX_VALUE</code>. In that case the method
459      * will wait until the connector starts or fails to start.
460      * If timeout is negative or zero, returns as soon as possible
461      * without waiting.
462      *
463      * @exception CommunicationException if the connectors fails to start.
464      * @exception InterruptedException if the thread is interrupted or the
465      * timeout expires.
466      *
467      */

468     private void waitForStart(long timeout)
469     throws CommunicationException, InterruptedException JavaDoc {
470         if (isTraceOn())
471             trace("waitForStart", "Timeout=" + timeout +
472           " ; current state = " + getStateString());
473     
474     final long startTime = System.currentTimeMillis();
475  
476     synchronized (stateLock) {
477         while (state == STARTING) {
478         // Time elapsed since startTime...
479
//
480
final long elapsed = System.currentTimeMillis() - startTime;
481
482         // wait for timeout - elapsed.
483
// A timeout of Long.MAX_VALUE is equivalent to something
484
// like 292271023 years - which is pretty close to
485
// forever as far as we are concerned ;-)
486
//
487
final long remainingTime = timeout-elapsed;
488
489         // If remainingTime is negative, the timeout has elapsed.
490
//
491
if (remainingTime < 0) {
492             if (isTraceOn())
493             trace("waitForStart",
494                   "timeout < 0, return without wait");
495             throw new InterruptedException JavaDoc("Timeout expired");
496         }
497
498         // We're going to wait until someone notifies on the
499
// the stateLock object, or until the timeout expires,
500
// or until the thread is interrupted.
501
//
502
try {
503             stateLock.wait(remainingTime);
504         } catch (InterruptedException JavaDoc e) {
505             if (isTraceOn())
506             trace("waitForStart", "wait interrupted");
507
508             // If we are now ONLINE, then no need to rethrow the
509
// exception... we're simply going to exit the while
510
// loop. Otherwise, throw the InterruptedException.
511
//
512
if (state != ONLINE) throw e;
513         }
514         }
515
516         // We're no longer in STARTING state
517
//
518
if (state == ONLINE) {
519         // OK, we're started, everything went fine, just return
520
//
521
if (isTraceOn()) trace("waitForStart", "started");
522         return;
523         } else if (startException instanceof CommunicationException) {
524         // There was some exception during the starting phase.
525
// Cast and throw...
526
//
527
throw (CommunicationException)startException;
528         } else if (startException instanceof InterruptedException JavaDoc) {
529         // There was some exception during the starting phase.
530
// Cast and throw...
531
//
532
throw (InterruptedException JavaDoc)startException;
533         } else if (startException != null) {
534         // There was some exception during the starting phase.
535
// Wrap and throw...
536
//
537
throw new CommunicationException(startException,
538                          "Failed to start: "+
539                          startException);
540         } else {
541         // We're not ONLINE, and there's no exception...
542
// Something went wrong but we don't know what...
543
//
544
throw new CommunicationException("Failed to start: state is "+
545                          getStringForState(state));
546         }
547     }
548     }
549
550     /**
551      * Gets the state of this <CODE>CommunicatorServer</CODE> as an integer.
552      *
553      * @return <CODE>ONLINE</CODE>, <CODE>OFFLINE</CODE>,
554      * <CODE>STARTING</CODE> or <CODE>STOPPING</CODE>.
555      */

556     public int getState() {
557     synchronized (stateLock) {
558         return state ;
559     }
560     }
561
562     /**
563      * Gets the state of this <CODE>CommunicatorServer</CODE> as a string.
564      *
565      * @return One of the strings "ONLINE", "OFFLINE", "STARTING" or
566      * "STOPPING".
567      */

568     public String JavaDoc getStateString() {
569         return getStringForState(state) ;
570     }
571
572     /**
573      * Gets the host name used by this <CODE>CommunicatorServer</CODE>.
574      *
575      * @return The host name used by this <CODE>CommunicatorServer</CODE>.
576      */

577     public String JavaDoc getHost() {
578         try {
579             host = InetAddress.getLocalHost().getHostName();
580         } catch (Exception JavaDoc e) {
581             host = "Unknown host";
582         }
583         return host ;
584     }
585
586     /**
587      * Gets the port number used by this <CODE>CommunicatorServer</CODE>.
588      *
589      * @return The port number used by this <CODE>CommunicatorServer</CODE>.
590      */

591     public int getPort() {
592     synchronized (stateLock) {
593         return port ;
594     }
595     }
596
597     /**
598      * Sets the port number used by this <CODE>CommunicatorServer</CODE>.
599      *
600      * @param port The port number used by this
601      * <CODE>CommunicatorServer</CODE>.
602      *
603      * @exception java.lang.IllegalStateException This method has been invoked
604      * while the communicator was ONLINE or STARTING.
605      */

606     public void setPort(int port) throws java.lang.IllegalStateException JavaDoc {
607     synchronized (stateLock) {
608         if ((state == ONLINE) || (state == STARTING))
609         throw new IllegalStateException JavaDoc("Stop server before " +
610                         "carrying out this operation");
611         this.port = port;
612         dbgTag = makeDebugTag();
613     }
614     }
615
616     /**
617      * Gets the protocol being used by this <CODE>CommunicatorServer</CODE>.
618      * @return The protocol as a string.
619      */

620     public abstract String JavaDoc getProtocol() ;
621
622     /**
623      * Gets the number of clients that have been processed by this
624      * <CODE>CommunicatorServer</CODE> since its creation.
625      *
626      * @return The number of clients handled by this
627      * <CODE>CommunicatorServer</CODE>
628      * since its creation. This counter is not reset by the
629      * <CODE>stop</CODE> method.
630      */

631     int getServedClientCount() {
632         return servedClientCount ;
633     }
634   
635     /**
636      * Gets the number of clients currently being processed by this
637      * <CODE>CommunicatorServer</CODE>.
638      *
639      * @return The number of clients currently being processed by this
640      * <CODE>CommunicatorServer</CODE>.
641      */

642     int getActiveClientCount() {
643         int result = clientHandlerVector.size() ;
644         return result ;
645     }
646
647     /**
648      * Gets the maximum number of clients that this
649      * <CODE>CommunicatorServer</CODE> can process concurrently.
650      *
651      * @return The maximum number of clients that this
652      * <CODE>CommunicatorServer</CODE> can
653      * process concurrently.
654      */

655     int getMaxActiveClientCount() {
656         return maxActiveClientCount ;
657     }
658
659     /**
660      * Sets the maximum number of clients this
661      * <CODE>CommunicatorServer</CODE> can process concurrently.
662      *
663      * @param c The number of clients.
664      *
665      * @exception java.lang.IllegalStateException This method has been invoked
666      * while the communicator was ONLINE or STARTING.
667      */

668     void setMaxActiveClientCount(int c)
669     throws java.lang.IllegalStateException JavaDoc {
670     synchronized (stateLock) {
671         if ((state == ONLINE) || (state == STARTING)) {
672         throw new IllegalStateException JavaDoc(
673               "Stop server before carrying out this operation");
674         }
675         maxActiveClientCount = c ;
676     }
677     }
678
679     /**
680      * For SNMP Runtime internal use only.
681      */

682     void notifyClientHandlerCreated(ClientHandler h) {
683         clientHandlerVector.addElement(h) ;
684     }
685
686     /**
687      * For SNMP Runtime internal use only.
688      */

689     synchronized void notifyClientHandlerDeleted(ClientHandler h) {
690         clientHandlerVector.removeElement(h);
691     notifyAll();
692     }
693
694     /**
695      * The number of times the communicator server will attempt
696      * to bind before giving up.
697      **/

698     protected int getBindTries() {
699     return 50;
700     }
701
702     /**
703      * The delay, in ms, during which the communicator server will sleep before
704      * attempting to bind again.
705      **/

706     protected long getBindSleepTime() {
707     return 100;
708     }
709
710     /**
711      * For SNMP Runtime internal use only.
712      * <p>
713      * The <CODE>run</CODE> method executed by this connector's main thread.
714      */

715     public void run() {
716         
717         // Fix jaw.00667.B
718
// It seems that the init of "i" and "success"
719
// need to be done outside the "try" clause...
720
// A bug in Java 2 production release ?
721
//
722
int i = 0;
723         boolean success = false;
724         
725         // ----------------------
726
// Bind
727
// ----------------------
728
try {
729             // Fix for bug 4352451: "java.net.BindException: Address in use".
730
//
731
final int bindRetries = getBindTries();
732         final long sleepTime = getBindSleepTime();
733             while (i < bindRetries && !success) {
734                 try {
735                     // Try socket connection.
736
//
737
doBind();
738                     success = true;
739                 } catch (CommunicationException ce) {
740                     i++;
741                     try {
742                         Thread.sleep(sleepTime);
743                     } catch (InterruptedException JavaDoc ie) {
744             throw ie;
745                     }
746                 }
747             }
748             // Retry last time to get correct exception.
749
//
750
if (!success) {
751                 // Try socket connection.
752
//
753
doBind();
754             }
755
756         } catch(Exception JavaDoc x) {
757             if (isDebugOn()) {
758                 debug("run","Unexpected exception = "+x) ;
759             }
760         synchronized(stateLock) {
761         startException = x;
762         changeState(OFFLINE);
763         }
764             if (isTraceOn()) {
765                 trace("run","State is OFFLINE") ;
766             }
767         doError(x);
768         return;
769         }
770
771         try {
772             // ----------------------
773
// State change
774
// ----------------------
775
changeState(ONLINE) ;
776             if (isTraceOn()) {
777                 trace("run","State is ONLINE") ;
778             }
779
780             // ----------------------
781
// Main loop
782
// ----------------------
783
while (!stopRequested) {
784                 servedClientCount++;
785                 doReceive() ;
786                 waitIfTooManyClients() ;
787                 doProcess() ;
788             }
789             if (isTraceOn()) {
790                 trace("run","Stop has been requested") ;
791             }
792
793         } catch(InterruptedException JavaDoc x) {
794             if (isTraceOn()) {
795                 trace("run","Interrupt caught") ;
796             }
797             changeState(STOPPING);
798         } catch(Exception JavaDoc x) {
799             if (isDebugOn()) {
800                 debug("run","Unexpected exception = "+x) ;
801             }
802             changeState(STOPPING);
803     } finally {
804         synchronized (stateLock) {
805         interrupted = true;
806         Thread.currentThread().interrupted();
807         }
808
809         // ----------------------
810
// unBind
811
// ----------------------
812
try {
813         doUnbind() ;
814         waitClientTermination() ;
815         changeState(OFFLINE);
816         if (isTraceOn()) {
817             trace("run","State is OFFLINE") ;
818         }
819         } catch(Exception JavaDoc x) {
820         if (isDebugOn()) {
821             debug("run","Unexpected exception = "+x) ;
822         }
823         changeState(OFFLINE);
824         }
825         
826     }
827     }
828
829     /**
830      */

831     protected abstract void doError(Exception JavaDoc e) throws CommunicationException;
832
833     //
834
// To be defined by the subclass.
835
//
836
// Each method below is called by run() and must be subclassed.
837
// If the method sends an exception (Communication or Interrupt), this
838
// will end up the run() method and switch the connector offline.
839
//
840
// If it is a CommunicationException, run() will call
841
// Debug.printException().
842
//
843
// All these methods should propagate the InterruptedException to inform
844
// run() that the connector must be switch OFFLINE.
845
//
846
//
847
//
848
// doBind() should do all what is needed before calling doReceive().
849
// If doBind() throws an exception, doUnbind() is not to be called
850
// and run() ends up.
851
//
852

853     /**
854      */

855     protected abstract void doBind()
856     throws CommunicationException, InterruptedException JavaDoc ;
857
858     /**
859      * <CODE>doReceive()</CODE> should block until a client is available.
860      * If this method throws an exception, <CODE>doProcess()</CODE> is not
861      * called but <CODE>doUnbind()</CODE> is called then <CODE>run()</CODE>
862      * stops.
863      */

864     protected abstract void doReceive()
865     throws CommunicationException, InterruptedException JavaDoc ;
866
867     /**
868      * <CODE>doProcess()</CODE> is called after <CODE>doReceive()</CODE>:
869      * it should process the requests of the incoming client.
870      * If it throws an exception, <CODE>doUnbind()</CODE> is called and
871      * <CODE>run()</CODE> stops.
872      */

873     protected abstract void doProcess()
874     throws CommunicationException, InterruptedException JavaDoc ;
875
876     /**
877      * <CODE>doUnbind()</CODE> is called whenever the connector goes
878      * <CODE>OFFLINE</CODE>, except if <CODE>doBind()</CODE> has thrown an
879      * exception.
880      */

881     protected abstract void doUnbind()
882     throws CommunicationException, InterruptedException JavaDoc ;
883
884     /**
885      * Get the <code>MBeanServer</code> object to which incoming requests are
886      * sent. This is either the MBean server in which this connector is
887      * registered, or an <code>MBeanServerForwarder</code> leading to that
888      * server.
889      */

890     public synchronized MBeanServer JavaDoc getMBeanServer() {
891         return topMBS;
892     }
893
894     /**
895      * Set the <code>MBeanServer</code> object to which incoming
896      * requests are sent. This must be either the MBean server in
897      * which this connector is registered, or an
898      * <code>MBeanServerForwarder</code> leading to that server. An
899      * <code>MBeanServerForwarder</code> <code>mbsf</code> leads to an
900      * MBean server <code>mbs</code> if
901      * <code>mbsf.getMBeanServer()</code> is either <code>mbs</code>
902      * or an <code>MBeanServerForwarder</code> leading to
903      * <code>mbs</code>.
904      *
905      * @exception IllegalArgumentException if <code>newMBS</code> is neither
906      * the MBean server in which this connector is registered nor an
907      * <code>MBeanServerForwarder</code> leading to that server.
908      *
909      * @exception IllegalStateException This method has been invoked
910      * while the communicator was ONLINE or STARTING.
911      */

912     public synchronized void setMBeanServer(MBeanServer JavaDoc newMBS)
913         throws IllegalArgumentException JavaDoc, IllegalStateException JavaDoc {
914     synchronized (stateLock) {
915         if (state == ONLINE || state == STARTING)
916         throw new IllegalStateException JavaDoc("Stop server before " +
917                         "carrying out this operation");
918     }
919     final String JavaDoc error =
920         "MBeanServer argument must be MBean server where this " +
921         "server is registered, or an MBeanServerForwarder " +
922         "leading to that server";
923     Vector JavaDoc seenMBS = new Vector JavaDoc();
924     for (MBeanServer JavaDoc mbs = newMBS;
925          mbs != bottomMBS;
926          mbs = ((MBeanServerForwarder JavaDoc) mbs).getMBeanServer()) {
927         if (!(mbs instanceof MBeanServerForwarder JavaDoc))
928         throw new IllegalArgumentException JavaDoc(error);
929         if (seenMBS.contains(mbs))
930         throw new IllegalArgumentException JavaDoc("MBeanServerForwarder " +
931                            "loop");
932         seenMBS.addElement(mbs);
933     }
934     topMBS = newMBS;
935     }
936
937     //
938
// To be called by the subclass if needed
939
//
940
/**
941      * For internal use only.
942      */

943     ObjectName JavaDoc getObjectName() {
944         return objectName ;
945     }
946
947     /**
948      * For internal use only.
949      */

950     void changeState(int newState) {
951     int oldState;
952     synchronized (stateLock) {
953         if (state == newState)
954         return;
955         oldState = state;
956         state = newState;
957         stateLock.notifyAll();
958     }
959     sendStateChangeNotification(oldState, newState);
960     }
961     
962     /**
963      * Returns the string used in debug traces.
964      */

965     String JavaDoc makeDebugTag() {
966         return "CommunicatorServer["+ getProtocol() + ":" + getPort() + "]" ;
967     }
968
969     /**
970      * Returns the string used to name the connector thread.
971      */

972     String JavaDoc makeThreadName() {
973         String JavaDoc result ;
974
975         if (objectName == null)
976             result = "CommunicatorServer" ;
977         else
978             result = objectName.toString() ;
979         
980         return result ;
981     }
982   
983     /**
984      * This method blocks if there are too many active clients.
985      * Call to <CODE>wait()</CODE> is terminated when a client handler
986      * thread calls <CODE>notifyClientHandlerDeleted(this)</CODE> ;
987      */

988     private synchronized void waitIfTooManyClients()
989     throws InterruptedException JavaDoc {
990         while (getActiveClientCount() >= maxActiveClientCount) {
991             if (isTraceOn()) {
992                 trace("waitIfTooManyClients",
993               "Waiting for a client to terminate") ;
994             }
995             wait();
996         }
997     }
998
999     /**
1000     * This method blocks until there is no more active client.
1001     */

1002    private void waitClientTermination() {
1003        int s = clientHandlerVector.size() ;
1004        if (isTraceOn()) {
1005            if (s >= 1) {
1006                trace("waitClientTermination","waiting for " +
1007                      s + " clients to terminate") ;
1008            }
1009        }
1010
1011        for (Enumeration JavaDoc e = clientHandlerVector.elements() ;
1012         e.hasMoreElements();){
1013            ClientHandler h = (ClientHandler)e.nextElement() ;
1014            h.join() ;
1015        }
1016
1017        if (isTraceOn()) {
1018            if (s >= 1) {
1019                trace("waitClientTermination","Ok, let's go...") ;
1020            }
1021        }
1022    }
1023  
1024    /**
1025     * Call <CODE>interrupt()</CODE> on each pending client.
1026     */

1027    private void terminateAllClient() {
1028        int s = clientHandlerVector.size() ;
1029        if (isTraceOn()) {
1030            if (s >= 1) {
1031                trace("terminateAllClient","Interrupting " + s + " clients") ;
1032            }
1033        }
1034    
1035        for (Enumeration JavaDoc e = clientHandlerVector.elements() ;
1036         e.hasMoreElements();){
1037            ClientHandler h = (ClientHandler)e.nextElement() ;
1038            h.interrupt() ;
1039        }
1040    }
1041
1042    /**
1043     * Controls the way the CommunicatorServer service is deserialized.
1044     */

1045    private void readObject(ObjectInputStream JavaDoc stream)
1046        throws IOException JavaDoc, ClassNotFoundException JavaDoc {
1047      
1048        // Call the default deserialization of the object.
1049
//
1050
stream.defaultReadObject();
1051      
1052        // Call the specific initialization for the CommunicatorServer service.
1053
// This is for transient structures to be initialized to specific
1054
// default values.
1055
//
1056
stateLock = new Object JavaDoc();
1057    state = OFFLINE;
1058        stopRequested = false;
1059        servedClientCount = 0;
1060        clientHandlerVector = new Vector JavaDoc();
1061    fatherThread = Thread.currentThread();
1062    mainThread = null;
1063    notifCount = 0;
1064    notifInfos = null;
1065    notifBroadcaster = new NotificationBroadcasterSupport JavaDoc();
1066    dbgTag = makeDebugTag();
1067    }
1068  
1069
1070    //
1071
// NotificationBroadcaster
1072
//
1073

1074    /**
1075     * Adds a listener for the notifications emitted by this
1076     * CommunicatorServer.
1077     * There is only one type of notifications sent by the CommunicatorServer:
1078     * they are <tt>{@link javax.management.AttributeChangeNotification}</tt>,
1079     * sent when the <tt>State</tt> attribute of this CommunicatorServer
1080     * changes.
1081     *
1082     * @param listener The listener object which will handle the emitted
1083     * notifications.
1084     * @param filter The filter object. If filter is null, no filtering
1085     * will be performed before handling notifications.
1086     * @param handback An object which will be sent back unchanged to the
1087     * listener when a notification is emitted.
1088     *
1089     * @exception IllegalArgumentException Listener parameter is null.
1090     */

1091    public void addNotificationListener(NotificationListener JavaDoc listener,
1092                    NotificationFilter JavaDoc filter,
1093                    Object JavaDoc handback)
1094        throws java.lang.IllegalArgumentException JavaDoc {
1095
1096    if (isDebugOn()) {
1097        debug("addNotificationListener","Adding listener "+ listener +
1098          " with filter "+ filter + " and handback "+ handback);
1099    }
1100    notifBroadcaster.addNotificationListener(listener, filter, handback);
1101    }
1102    
1103    /**
1104     * Removes the specified listener from this CommunicatorServer.
1105     * Note that if the listener has been registered with different
1106     * handback objects or notification filters, all entries corresponding
1107     * to the listener will be removed.
1108     *
1109     * @param listener The listener object to be removed.
1110     *
1111     * @exception ListenerNotFoundException The listener is not registered.
1112     */

1113    public void removeNotificationListener(NotificationListener JavaDoc listener)
1114        throws ListenerNotFoundException JavaDoc {
1115
1116    if (isDebugOn()) {
1117        debug("removeNotificationListener","Removing listener "+ listener);
1118    }
1119    notifBroadcaster.removeNotificationListener(listener);
1120    }
1121    
1122    /**
1123     * Returns an array of MBeanNotificationInfo objects describing
1124     * the notification types sent by this CommunicatorServer.
1125     * There is only one type of notifications sent by the CommunicatorServer:
1126     * it is <tt>{@link javax.management.AttributeChangeNotification}</tt>,
1127     * sent when the <tt>State</tt> attribute of this CommunicatorServer
1128     * changes.
1129     */

1130    public MBeanNotificationInfo JavaDoc[] getNotificationInfo() {
1131    
1132    // Initialize notifInfos on first call to getNotificationInfo()
1133
//
1134
if (notifInfos == null) {
1135        notifInfos = new MBeanNotificationInfo JavaDoc[1];
1136        String JavaDoc[] notifTypes = {
1137        AttributeChangeNotification.ATTRIBUTE_CHANGE};
1138        notifInfos[0] = new MBeanNotificationInfo JavaDoc( notifTypes,
1139             AttributeChangeNotification JavaDoc.class.getName(),
1140             "Sent to notify that the value of the State attribute "+
1141             "of this CommunicatorServer instance has changed.");
1142    }
1143
1144    return notifInfos;
1145    }
1146    
1147    /**
1148     *
1149     */

1150    private void sendStateChangeNotification(int oldState, int newState) {
1151
1152    String JavaDoc oldStateString = getStringForState(oldState);
1153    String JavaDoc newStateString = getStringForState(newState);
1154    String JavaDoc message = new StringBuffer JavaDoc().append(dbgTag)
1155        .append(" The value of attribute State has changed from ")
1156        .append(oldState).append(" (").append(oldStateString)
1157        .append(") to ").append(newState).append(" (")
1158        .append(newStateString).append(").").toString();
1159
1160    notifCount++;
1161    AttributeChangeNotification JavaDoc notif =
1162        new AttributeChangeNotification JavaDoc(this, // source
1163
notifCount, // sequence number
1164
System.currentTimeMillis(), // time stamp
1165
message, // message
1166
"State", // attribute name
1167
"int", // attribute type
1168
new Integer JavaDoc(oldState), // old value
1169
new Integer JavaDoc(newState) ); // new value
1170

1171    if (isDebugOn()) {
1172        debug("sendStateChangeNotification",
1173          "Sending AttributeChangeNotification #"+ notifCount +
1174          " with message: "+ message);
1175    }
1176    notifBroadcaster.sendNotification(notif);
1177    }
1178
1179    /**
1180     *
1181     */

1182    private static String JavaDoc getStringForState(int s) {
1183        switch (s) {
1184        case ONLINE: return "ONLINE";
1185        case STARTING: return "STARTING";
1186        case OFFLINE: return "OFFLINE";
1187        case STOPPING: return "STOPPING";
1188    default: return "UNDEFINED";
1189        }
1190    }
1191
1192
1193    //
1194
// MBeanRegistration
1195
//
1196

1197    /**
1198     * Preregister method of connector.
1199     *
1200     *@param server The <CODE>MBeanServer</CODE> in which the MBean will
1201     * be registered.
1202     *@param name The object name of the MBean.
1203     *
1204     *@return The name of the MBean registered.
1205     *
1206     *@exception java.langException This exception should be caught by
1207     * the <CODE>MBeanServer</CODE> and re-thrown
1208     * as an <CODE>MBeanRegistrationException</CODE>.
1209     */

1210    public ObjectName JavaDoc preRegister(MBeanServer JavaDoc server, ObjectName JavaDoc name)
1211        throws java.lang.Exception JavaDoc {
1212        objectName = name;
1213    synchronized (this) {
1214        if (bottomMBS != null) {
1215        throw new IllegalArgumentException JavaDoc("connector already " +
1216                           "registered in an MBean " +
1217                           "server");
1218        }
1219        topMBS = bottomMBS = server;
1220    }
1221        dbgTag = makeDebugTag();
1222        return name;
1223    }
1224
1225    /**
1226     *
1227     *@param registrationDone Indicates whether or not the MBean has been
1228     * successfully registered in the <CODE>MBeanServer</CODE>.
1229     * The value false means that the registration phase has failed.
1230     */

1231    public void postRegister(Boolean JavaDoc registrationDone) {
1232    if (!registrationDone.booleanValue()) {
1233        synchronized (this) {
1234        topMBS = bottomMBS = null;
1235        }
1236    }
1237    }
1238    
1239    /**
1240     * Stop the connector.
1241     *
1242     * @exception java.langException This exception should be caught by
1243     * the <CODE>MBeanServer</CODE> and re-thrown
1244     * as an <CODE>MBeanRegistrationException</CODE>.
1245     */

1246    public void preDeregister() throws java.lang.Exception JavaDoc {
1247    synchronized (this) {
1248        topMBS = bottomMBS = null;
1249    }
1250        objectName = null ;
1251    final int cstate = getState();
1252        if ((cstate == ONLINE) || ( cstate == STARTING)) {
1253            stop() ;
1254        }
1255    }
1256
1257    /**
1258     * Do nothing.
1259     */

1260    public void postDeregister(){
1261    }
1262
1263    /**
1264     * Load a class using the default loader repository
1265     **/

1266    Class JavaDoc loadClass(String JavaDoc className)
1267    throws ClassNotFoundException JavaDoc {
1268    try {
1269        return Class.forName(className);
1270    } catch (ClassNotFoundException JavaDoc e) {
1271        final ClassLoaderRepository JavaDoc clr =
1272        MBeanServerFactory.getClassLoaderRepository(bottomMBS);
1273        if (clr == null) throw new ClassNotFoundException JavaDoc(className);
1274        return clr.loadClass(className);
1275    }
1276    }
1277
1278    //
1279
// Debug stuff
1280
//
1281

1282    /**
1283     */

1284    int infoType;
1285
1286    /**
1287     */

1288    boolean isTraceOn() {
1289        return Trace.isSelected(Trace.LEVEL_TRACE, infoType);
1290    }
1291
1292    /**
1293     */

1294    void trace(String JavaDoc clz, String JavaDoc func, String JavaDoc info) {
1295        Trace.send(Trace.LEVEL_TRACE, infoType, clz, func, info);
1296    }
1297
1298    /**
1299     */

1300    boolean isDebugOn() {
1301        return Trace.isSelected(Trace.LEVEL_DEBUG, infoType);
1302    }
1303
1304    /**
1305     */

1306    void debug(String JavaDoc clz, String JavaDoc func, String JavaDoc info) {
1307        Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, info);
1308    }
1309
1310    /**
1311     */

1312    void debug(String JavaDoc clz, String JavaDoc func, Throwable JavaDoc exception) {
1313        Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, exception);
1314    }
1315    
1316    /**
1317     */

1318    void trace(String JavaDoc func, String JavaDoc info) {
1319        trace(dbgTag, func, info);
1320    }
1321
1322    /**
1323     */

1324    void debug(String JavaDoc func, String JavaDoc info) {
1325        debug(dbgTag, func, info);
1326    }
1327
1328    /**
1329     */

1330    void debug(String JavaDoc func, Throwable JavaDoc exception) {
1331        debug(dbgTag, func, exception);
1332    }
1333}
1334
Popular Tags