KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > jk > common > ChannelSocket


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.jk.common;
18
19 import java.io.BufferedInputStream JavaDoc;
20 import java.io.BufferedOutputStream JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.io.InputStream JavaDoc;
23 import java.io.OutputStream JavaDoc;
24 import java.net.URLEncoder JavaDoc;
25 import java.net.InetAddress JavaDoc;
26 import java.net.ServerSocket JavaDoc;
27 import java.net.Socket JavaDoc;
28 import java.net.SocketException JavaDoc;
29
30 import javax.management.ListenerNotFoundException JavaDoc;
31 import javax.management.MBeanNotificationInfo JavaDoc;
32 import javax.management.Notification JavaDoc;
33 import javax.management.NotificationBroadcaster JavaDoc;
34 import javax.management.NotificationBroadcasterSupport JavaDoc;
35 import javax.management.NotificationFilter JavaDoc;
36 import javax.management.NotificationListener JavaDoc;
37 import javax.management.ObjectName JavaDoc;
38
39 import org.apache.commons.modeler.Registry;
40 import org.apache.jk.core.JkHandler;
41 import org.apache.jk.core.Msg;
42 import org.apache.jk.core.MsgContext;
43 import org.apache.jk.core.JkChannel;
44 import org.apache.jk.core.WorkerEnv;
45 import org.apache.coyote.Request;
46 import org.apache.coyote.RequestGroupInfo;
47 import org.apache.coyote.RequestInfo;
48 import org.apache.tomcat.util.threads.ThreadPool;
49 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
50
51
52 /* XXX Make the 'message type' pluggable
53  */

54
55 /* A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
56    TCP, Ajp14 API etc.
57    As we add other protocols/transports/APIs this will change, the current goal
58    is to get the same level of functionality as in the original jk connector.
59 */

60
61 /**
62  * Jk can use multiple protocols/transports.
63  * Various container adapters should load this object ( as a bean ),
64  * set configurations and use it. Note that the connector will handle
65  * all incoming protocols - it's not specific to ajp1x. The protocol
66  * is abstracted by MsgContext/Message/Channel.
67  */

68
69
70 /** Accept ( and send ) TCP messages.
71  *
72  * @author Costin Manolache
73  * @jmx:mbean name="jk:service=ChannelSocket"
74  * description="Accept socket connections"
75  * @jmx:notification name="org.apache.coyote.INVOKE
76  * @jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
77  * @jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
78  * @jmx:notification-handler name="org.apache.jk.JK_FLUSH
79  */

80 public class ChannelSocket extends JkHandler
81     implements NotificationBroadcaster JavaDoc, JkChannel {
82     private static org.apache.commons.logging.Log log=
83         org.apache.commons.logging.LogFactory.getLog( ChannelSocket.class );
84
85     int startPort=8009;
86     int maxPort=8019; // 0 for backward compat.
87
int port=startPort;
88     InetAddress JavaDoc inet;
89     int serverTimeout;
90     boolean tcpNoDelay=true; // nodelay to true by default
91
int linger=100;
92     int socketTimeout;
93
94     long requestCount=0;
95     
96     /* Turning this to true will reduce the latency with about 20%.
97        But it requires changes in tomcat to make sure client-requested
98        flush() is honored ( on my test, I got 367->433 RPS and
99        52->35ms average time with a simple servlet )
100     */

101     static final boolean BUFFER_WRITE=false;
102     
103     ThreadPool tp=ThreadPool.createThreadPool(true);
104
105     /* ==================== Tcp socket options ==================== */
106
107     /**
108      * @jmx:managed-constructor description="default constructor"
109      */

110     public ChannelSocket() {
111         // This should be integrated with the domain setup
112
}
113     
114     public ThreadPool getThreadPool() {
115         return tp;
116     }
117
118     public long getRequestCount() {
119         return requestCount;
120     }
121     
122     /** Set the port for the ajp13 channel.
123      * To support seemless load balancing and jni, we treat this
124      * as the 'base' port - we'll try up until we find one that is not
125      * used. We'll also provide the 'difference' to the main coyote
126      * handler - that will be our 'sessionID' and the position in
127      * the scoreboard and the suffix for the unix domain socket.
128      *
129      * @jmx:managed-attribute description="Port to listen" access="READ_WRITE"
130      */

131     public void setPort( int port ) {
132         this.startPort=port;
133         this.port=port;
134         this.maxPort=port+10;
135     }
136
137     public int getPort() {
138         return port;
139     }
140
141     public void setAddress(InetAddress JavaDoc inet) {
142         this.inet=inet;
143     }
144
145     /**
146      * @jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
147      */

148     public void setAddress(String JavaDoc inet) {
149         try {
150             this.inet= InetAddress.getByName( inet );
151         } catch( Exception JavaDoc ex ) {
152             log.error("Error parsing "+inet,ex);
153         }
154     }
155
156     public String JavaDoc getAddress() {
157         if( inet!=null)
158             return inet.toString();
159         return "/0.0.0.0";
160     }
161
162     /**
163      * Sets the timeout in ms of the server sockets created by this
164      * server. This method allows the developer to make servers
165      * more or less responsive to having their server sockets
166      * shut down.
167      *
168      * <p>By default this value is 1000ms.
169      */

170     public void setServerTimeout(int timeout) {
171     this.serverTimeout = timeout;
172     }
173     public int getServerTimeout() {
174         return serverTimeout;
175     }
176
177     public void setTcpNoDelay( boolean b ) {
178     tcpNoDelay=b;
179     }
180
181     public boolean getTcpNoDelay() {
182         return tcpNoDelay;
183     }
184     
185     public void setSoLinger( int i ) {
186     linger=i;
187     }
188
189     public int getSoLinger() {
190         return linger;
191     }
192     
193     public void setSoTimeout( int i ) {
194     socketTimeout=i;
195     }
196
197     public int getSoTimeout() {
198     return socketTimeout;
199     }
200
201     public void setMaxPort( int i ) {
202         maxPort=i;
203     }
204
205     public int getMaxPort() {
206         return maxPort;
207     }
208
209     /** At startup we'll look for the first free port in the range.
210         The difference between this port and the beggining of the range
211         is the 'id'.
212         This is usefull for lb cases ( less config ).
213     */

214     public int getInstanceId() {
215         return port-startPort;
216     }
217
218     /** If set to false, the thread pool will be created in
219      * non-daemon mode, and will prevent main from exiting
220      */

221     public void setDaemon( boolean b ) {
222         tp.setDaemon( b );
223     }
224
225     public boolean getDaemon() {
226         return tp.getDaemon();
227     }
228
229
230     public void setMaxThreads( int i ) {
231         if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
232         tp.setMaxThreads(i);
233     }
234     
235     public void setMinSpareThreads( int i ) {
236         if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
237             tp.setMinSpareThreads(i);
238     }
239
240     public void setMaxSpareThreads( int i ) {
241         if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
242             tp.setMaxSpareThreads(i);
243     }
244
245     public int getMaxThreads() {
246         return tp.getMaxThreads();
247     }
248     
249     public int getMinSpareThreads() {
250         return tp.getMinSpareThreads();
251     }
252
253     public int getMaxSpareThreads() {
254         return tp.getMaxSpareThreads();
255     }
256
257     public void setBacklog(int i) {
258     }
259     
260     
261     /* ==================== ==================== */
262     ServerSocket JavaDoc sSocket;
263     final int socketNote=1;
264     final int isNote=2;
265     final int osNote=3;
266     final int notifNote=4;
267     boolean paused = false;
268
269     public void pause() throws Exception JavaDoc {
270         synchronized(this) {
271             paused = true;
272             unLockSocket();
273         }
274     }
275
276     public void resume() throws Exception JavaDoc {
277         synchronized(this) {
278             paused = false;
279             notify();
280         }
281     }
282
283
284     public void accept( MsgContext ep ) throws IOException JavaDoc {
285         if( sSocket==null ) return;
286         synchronized(this) {
287             while(paused) {
288                 try{
289                     wait();
290                 } catch(InterruptedException JavaDoc ie) {
291                     //Ignore, since can't happen
292
}
293             }
294         }
295         Socket JavaDoc s=sSocket.accept();
296         ep.setNote( socketNote, s );
297         if(log.isDebugEnabled() )
298             log.debug("Accepted socket " + s );
299         if( linger > 0 )
300             s.setSoLinger( true, linger);
301         if( socketTimeout > 0 )
302             s.setSoTimeout( socketTimeout );
303         
304         s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state
305

306         requestCount++;
307
308         InputStream JavaDoc is=new BufferedInputStream JavaDoc(s.getInputStream());
309         OutputStream JavaDoc os;
310         if( BUFFER_WRITE )
311             os = new BufferedOutputStream JavaDoc( s.getOutputStream());
312         else
313             os = s.getOutputStream();
314         ep.setNote( isNote, is );
315         ep.setNote( osNote, os );
316         ep.setControl( tp );
317     }
318
319     public void resetCounters() {
320         requestCount=0;
321     }
322
323     /** Called after you change some fields at runtime using jmx.
324         Experimental for now.
325     */

326     public void reinit() throws IOException JavaDoc {
327         destroy();
328         init();
329     }
330
331     /**
332      * @jmx:managed-operation
333      */

334     public void init() throws IOException JavaDoc {
335         // Find a port.
336
if (startPort == 0) {
337             port = 0;
338             if(log.isInfoEnabled())
339                 log.info("JK: ajp13 disabling channelSocket");
340             running = true;
341             return;
342         }
343         if (maxPort < startPort)
344             maxPort = startPort;
345         for( int i=startPort; i<=maxPort; i++ ) {
346             try {
347                 if( inet == null ) {
348                     sSocket = new ServerSocket JavaDoc( i, 0 );
349                 } else {
350                     sSocket=new ServerSocket JavaDoc( i, 0, inet );
351                 }
352                 port=i;
353                 break;
354             } catch( IOException JavaDoc ex ) {
355                 if(log.isInfoEnabled())
356                     log.info("Port busy " + i + " " + ex.toString());
357                 continue;
358             }
359         }
360
361         if( sSocket==null ) {
362             log.error("Can't find free port " + startPort + " " + maxPort );
363             return;
364         }
365         if(log.isInfoEnabled())
366             log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
367
368         // If this is not the base port and we are the 'main' channleSocket and
369
// SHM didn't already set the localId - we'll set the instance id
370
if( "channelSocket".equals( name ) &&
371             port != startPort &&
372             (wEnv.getLocalId()==0) ) {
373             wEnv.setLocalId( port - startPort );
374         }
375         if( serverTimeout > 0 )
376             sSocket.setSoTimeout( serverTimeout );
377
378         // XXX Reverse it -> this is a notification generator !!
379
if( next==null && wEnv!=null ) {
380             if( nextName!=null )
381                 setNext( wEnv.getHandler( nextName ) );
382             if( next==null )
383                 next=wEnv.getHandler( "dispatch" );
384             if( next==null )
385                 next=wEnv.getHandler( "request" );
386         }
387         JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
388         running = true;
389
390         // Run a thread that will accept connections.
391
// XXX Try to find a thread first - not sure how...
392
if( this.domain != null ) {
393             try {
394                 tpOName=new ObjectName JavaDoc(domain + ":type=ThreadPool,name=" +
395                                        getChannelName());
396
397                 Registry.getRegistry(null, null)
398                     .registerComponent(tp, tpOName, null);
399
400                 rgOName = new ObjectName JavaDoc
401                     (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
402                 Registry.getRegistry(null, null)
403                     .registerComponent(global, rgOName, null);
404             } catch (Exception JavaDoc e) {
405                 log.error("Can't register threadpool" );
406             }
407         }
408
409         tp.start();
410         SocketAcceptor acceptAjp=new SocketAcceptor( this );
411         tp.runIt( acceptAjp);
412
413     }
414
415     ObjectName JavaDoc tpOName;
416     ObjectName JavaDoc rgOName;
417     RequestGroupInfo global=new RequestGroupInfo();
418     int JMXRequestNote;
419
420     public void start() throws IOException JavaDoc{
421         if( sSocket==null )
422             init();
423     }
424
425     public void stop() throws IOException JavaDoc {
426         destroy();
427     }
428
429     public void registerRequest(Request req, MsgContext ep, int count) {
430         if(this.domain != null) {
431             try {
432                 RequestInfo rp=req.getRequestProcessor();
433                 rp.setGlobalProcessor(global);
434                 ObjectName JavaDoc roname = new ObjectName JavaDoc
435                     (getDomain() + ":type=RequestProcessor,worker="+
436                      getChannelName()+",name=JkRequest" +count);
437                 ep.setNote(JMXRequestNote, roname);
438                         
439                 Registry.getRegistry(null, null).registerComponent( rp, roname, null);
440             } catch( Exception JavaDoc ex ) {
441                 log.warn("Error registering request");
442             }
443         }
444     }
445
446     public void open(MsgContext ep) throws IOException JavaDoc {
447     }
448
449     
450     public void close(MsgContext ep) throws IOException JavaDoc {
451         Socket JavaDoc s=(Socket JavaDoc)ep.getNote( socketNote );
452         s.close();
453     }
454
455     private void unLockSocket() throws IOException JavaDoc {
456         // Need to create a connection to unlock the accept();
457
Socket JavaDoc s;
458         InetAddress JavaDoc ladr = inet;
459
460         if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
461             ladr = InetAddress.getLocalHost();
462         }
463         s=new Socket JavaDoc(ladr, port );
464         // setting soLinger to a small value will help shutdown the
465
// connection quicker
466
s.setSoLinger(true, 0);
467
468     s.close();
469     }
470
471     public void destroy() throws IOException JavaDoc {
472         running = false;
473         try {
474             /* If we disabled the channel return */
475             if (port == 0)
476                 return;
477             tp.shutdown();
478
479         if(!paused) {
480         unLockSocket();
481         }
482
483             sSocket.close(); // XXX?
484

485             if( tpOName != null ) {
486                 Registry.getRegistry(null, null).unregisterComponent(tpOName);
487             }
488             if( rgOName != null ) {
489                 Registry.getRegistry(null, null).unregisterComponent(rgOName);
490             }
491         } catch(Exception JavaDoc e) {
492             log.info("Error shutting down the channel " + port + " " +
493                     e.toString());
494             if( log.isDebugEnabled() ) log.debug("Trace", e);
495         }
496     }
497
498     public int send( Msg msg, MsgContext ep)
499         throws IOException JavaDoc
500     {
501         msg.end(); // Write the packet header
502
byte buf[]=msg.getBuffer();
503         int len=msg.getLen();
504         
505         if(log.isTraceEnabled() )
506             log.trace("send() " + len + " " + buf[4] );
507
508         OutputStream JavaDoc os=(OutputStream JavaDoc)ep.getNote( osNote );
509         os.write( buf, 0, len );
510         return len;
511     }
512
513     public int flush( Msg msg, MsgContext ep)
514         throws IOException JavaDoc
515     {
516         if( BUFFER_WRITE ) {
517             OutputStream JavaDoc os=(OutputStream JavaDoc)ep.getNote( osNote );
518             os.flush();
519         }
520         return 0;
521     }
522
523     public int receive( Msg msg, MsgContext ep )
524         throws IOException JavaDoc
525     {
526         if (log.isDebugEnabled()) {
527             log.debug("receive() ");
528         }
529
530         byte buf[]=msg.getBuffer();
531         int hlen=msg.getHeaderLength();
532         
533     // XXX If the length in the packet header doesn't agree with the
534
// actual number of bytes read, it should probably return an error
535
// value. Also, callers of this method never use the length
536
// returned -- should probably return true/false instead.
537

538         int rd = this.read(ep, buf, 0, hlen );
539         
540         if(rd < 0) {
541             // Most likely normal apache restart.
542
// log.warn("Wrong message " + rd );
543
return rd;
544         }
545
546         msg.processHeader();
547
548         /* After processing the header we know the body
549            length
550         */

551         int blen=msg.getLen();
552         
553     // XXX check if enough space - it's assert()-ed !!!
554

555     int total_read = 0;
556         
557         total_read = this.read(ep, buf, hlen, blen);
558         
559         if ((total_read <= 0) && (blen > 0)) {
560             log.warn("can't read body, waited #" + blen);
561             return -1;
562         }
563         
564         if (total_read != blen) {
565              log.warn( "incomplete read, waited #" + blen +
566                         " got only " + total_read);
567             return -2;
568         }
569         
570     return total_read;
571     }
572     
573     /**
574      * Read N bytes from the InputStream, and ensure we got them all
575      * Under heavy load we could experience many fragmented packets
576      * just read Unix Network Programming to recall that a call to
577      * read didn't ensure you got all the data you want
578      *
579      * from read() Linux manual
580      *
581      * On success, the number of bytes read is returned (zero indicates end
582      * of file),and the file position is advanced by this number.
583      * It is not an error if this number is smaller than the number of bytes
584      * requested; this may happen for example because fewer bytes
585      * are actually available right now (maybe because we were close to
586      * end-of-file, or because we are reading from a pipe, or from a
587      * terminal), or because read() was interrupted by a signal.
588      * On error, -1 is returned, and errno is set appropriately. In this
589      * case it is left unspecified whether the file position (if any) changes.
590      *
591      **/

592     public int read( MsgContext ep, byte[] b, int offset, int len)
593         throws IOException JavaDoc
594     {
595         InputStream JavaDoc is=(InputStream JavaDoc)ep.getNote( isNote );
596         int pos = 0;
597         int got;
598
599         while(pos < len) {
600             try {
601                 got = is.read(b, pos + offset, len - pos);
602             } catch(SocketException JavaDoc sex) {
603                 if(pos > 0) {
604                     log.info("Error reading data after "+pos+"bytes",sex);
605                 } else {
606                     log.debug("Error reading data", sex);
607                 }
608                 got = -1;
609             }
610             if (log.isTraceEnabled()) {
611                 log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
612                           offset + " " + len + " = " + got );
613             }
614
615             // connection just closed by remote.
616
if (got <= 0) {
617                 // This happens periodically, as apache restarts
618
// periodically.
619
// It should be more gracefull ! - another feature for Ajp14
620
// log.warn( "server has closed the current connection (-1)" );
621
return -3;
622             }
623
624             pos += got;
625         }
626         return pos;
627     }
628     
629     protected boolean running=true;
630     
631     /** Accept incoming connections, dispatch to the thread pool
632      */

633     void acceptConnections() {
634         if( log.isDebugEnabled() )
635             log.debug("Accepting ajp connections on " + port);
636         while( running ) {
637         try{
638                 MsgContext ep=new MsgContext();
639                 ep.setSource(this);
640                 ep.setWorkerEnv( wEnv );
641                 this.accept(ep);
642
643                 if( !running ) break;
644                 
645                 // Since this is a long-running connection, we don't care
646
// about the small GC
647
SocketConnection ajpConn=
648                     new SocketConnection(this, ep);
649                 tp.runIt( ajpConn );
650         }catch(Exception JavaDoc ex) {
651                 if (running)
652                     log.warn("Exception executing accept" ,ex);
653         }
654         }
655     }
656
657     /** Process a single ajp connection.
658      */

659     void processConnection(MsgContext ep) {
660         try {
661             MsgAjp recv=new MsgAjp();
662             while( running ) {
663                 if(paused) { // Drop the connection on pause
664
break;
665                 }
666                 int status= this.receive( recv, ep );
667                 if( status <= 0 ) {
668                     if( status==-3)
669                         log.debug( "server has been restarted or reset this connection" );
670                     else
671                         log.warn("Closing ajp connection " + status );
672                     break;
673                 }
674                 ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
675                 
676                 ep.setType( 0 );
677                 // Will call next
678
status= this.invoke( recv, ep );
679                 if( status!= JkHandler.OK ) {
680                     log.warn("processCallbacks status " + status );
681                     break;
682                 }
683             }
684         } catch( Exception JavaDoc ex ) {
685             String JavaDoc msg = ex.getMessage();
686             if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
687                 log.debug( "Server has been restarted or reset this connection");
688             else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
689                 log.info( "connection timeout reached");
690             else
691                 log.error( "Error, processing connection", ex);
692         } finally {
693             /*
694              * Whatever happened to this connection (remote closed it, timeout, read error)
695              * the socket SHOULD be closed, or we may be in situation where the webserver
696              * will continue to think the socket is still open and will forward request
697              * to tomcat without receiving ever a reply
698              */

699             try {
700                 this.close( ep );
701             }
702             catch( Exception JavaDoc e) {
703                 log.error( "Error, closing connection", e);
704             }
705             try{
706                 Request req = (Request)ep.getRequest();
707                 if( req != null ) {
708                     ObjectName JavaDoc roname = (ObjectName JavaDoc)ep.getNote(JMXRequestNote);
709                     if( roname != null ) {
710                         Registry.getRegistry(null, null).unregisterComponent(roname);
711                     }
712                     req.getRequestProcessor().setGlobalProcessor(null);
713                 }
714             } catch( Exception JavaDoc ee) {
715                 log.error( "Error, releasing connection",ee);
716             }
717         }
718     }
719
720     // XXX This should become handleNotification
721
public int invoke( Msg msg, MsgContext ep ) throws IOException JavaDoc {
722         int type=ep.getType();
723
724         switch( type ) {
725         case JkHandler.HANDLE_RECEIVE_PACKET:
726             if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
727             return receive( msg, ep );
728         case JkHandler.HANDLE_SEND_PACKET:
729             return send( msg, ep );
730         case JkHandler.HANDLE_FLUSH:
731             return flush( msg, ep );
732         }
733
734         if( log.isDebugEnabled() )
735             log.debug("Call next " + type + " " + next);
736
737         // Send notification
738
if( nSupport!=null ) {
739             Notification JavaDoc notif=(Notification JavaDoc)ep.getNote(notifNote);
740             if( notif==null ) {
741                 notif=new Notification JavaDoc("channelSocket.message", ep, requestCount );
742                 ep.setNote( notifNote, notif);
743             }
744             nSupport.sendNotification(notif);
745         }
746
747         if( next != null ) {
748             return next.invoke( msg, ep );
749         } else {
750             log.info("No next ");
751         }
752
753         return OK;
754     }
755     
756     public boolean isSameAddress(MsgContext ep) {
757         Socket JavaDoc s=(Socket JavaDoc)ep.getNote( socketNote );
758         return isSameAddress( s.getLocalAddress(), s.getInetAddress());
759     }
760     
761     public String JavaDoc getChannelName() {
762         String JavaDoc encodedAddr = "";
763         if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
764             encodedAddr = getAddress();
765             if (encodedAddr.startsWith("/"))
766                 encodedAddr = encodedAddr.substring(1);
767         encodedAddr = URLEncoder.encode(encodedAddr) + "-";
768         }
769         return ("jk-" + encodedAddr + port);
770     }
771     
772     /**
773      * Return <code>true</code> if the specified client and server addresses
774      * are the same. This method works around a bug in the IBM 1.1.8 JVM on
775      * Linux, where the address bytes are returned reversed in some
776      * circumstances.
777      *
778      * @param server The server's InetAddress
779      * @param client The client's InetAddress
780      */

781     public static boolean isSameAddress(InetAddress JavaDoc server, InetAddress JavaDoc client)
782     {
783     // Compare the byte array versions of the two addresses
784
byte serverAddr[] = server.getAddress();
785     byte clientAddr[] = client.getAddress();
786     if (serverAddr.length != clientAddr.length)
787         return (false);
788     boolean match = true;
789     for (int i = 0; i < serverAddr.length; i++) {
790         if (serverAddr[i] != clientAddr[i]) {
791         match = false;
792         break;
793         }
794     }
795     if (match)
796         return (true);
797
798     // Compare the reversed form of the two addresses
799
for (int i = 0; i < serverAddr.length; i++) {
800         if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
801         return (false);
802     }
803     return (true);
804     }
805
806     public void sendNewMessageNotification(Notification JavaDoc notification) {
807         if( nSupport!= null )
808             nSupport.sendNotification(notification);
809     }
810
811     private NotificationBroadcasterSupport JavaDoc nSupport= null;
812
813     public void addNotificationListener(NotificationListener JavaDoc listener,
814                                         NotificationFilter JavaDoc filter,
815                                         Object JavaDoc handback)
816             throws IllegalArgumentException JavaDoc
817     {
818         if( nSupport==null ) nSupport=new NotificationBroadcasterSupport JavaDoc();
819         nSupport.addNotificationListener(listener, filter, handback);
820     }
821
822     public void removeNotificationListener(NotificationListener JavaDoc listener)
823             throws ListenerNotFoundException JavaDoc
824     {
825         if( nSupport!=null)
826             nSupport.removeNotificationListener(listener);
827     }
828
829     MBeanNotificationInfo JavaDoc notifInfo[]=new MBeanNotificationInfo JavaDoc[0];
830
831     public void setNotificationInfo( MBeanNotificationInfo JavaDoc info[]) {
832         this.notifInfo=info;
833     }
834
835     public MBeanNotificationInfo JavaDoc[] getNotificationInfo() {
836         return notifInfo;
837     }
838 }
839
840 class SocketAcceptor implements ThreadPoolRunnable {
841     ChannelSocket wajp;
842     
843     SocketAcceptor(ChannelSocket wajp ) {
844         this.wajp=wajp;
845     }
846
847     public Object JavaDoc[] getInitData() {
848         return null;
849     }
850
851     public void runIt(Object JavaDoc thD[]) {
852         wajp.acceptConnections();
853     }
854 }
855
856 class SocketConnection implements ThreadPoolRunnable {
857     ChannelSocket wajp;
858     MsgContext ep;
859
860     SocketConnection(ChannelSocket wajp, MsgContext ep) {
861         this.wajp=wajp;
862         this.ep=ep;
863     }
864
865
866     public Object JavaDoc[] getInitData() {
867         return null;
868     }
869     
870     public void runIt(Object JavaDoc perTh[]) {
871         wajp.processConnection(ep);
872         ep = null;
873     }
874 }
875
Popular Tags