1 16 17 package org.apache.jk.common; 18 19 import java.io.BufferedInputStream ; 20 import java.io.BufferedOutputStream ; 21 import java.io.IOException ; 22 import java.io.InputStream ; 23 import java.io.OutputStream ; 24 import java.net.URLEncoder ; 25 import java.net.InetAddress ; 26 import java.net.ServerSocket ; 27 import java.net.Socket ; 28 import java.net.SocketException ; 29 30 import javax.management.ListenerNotFoundException ; 31 import javax.management.MBeanNotificationInfo ; 32 import javax.management.Notification ; 33 import javax.management.NotificationBroadcaster ; 34 import javax.management.NotificationBroadcasterSupport ; 35 import javax.management.NotificationFilter ; 36 import javax.management.NotificationListener ; 37 import javax.management.ObjectName ; 38 39 import org.apache.commons.modeler.Registry; 40 import org.apache.jk.core.JkHandler; 41 import org.apache.jk.core.Msg; 42 import org.apache.jk.core.MsgContext; 43 import org.apache.jk.core.JkChannel; 44 import org.apache.jk.core.WorkerEnv; 45 import org.apache.coyote.Request; 46 import org.apache.coyote.RequestGroupInfo; 47 import org.apache.coyote.RequestInfo; 48 import org.apache.tomcat.util.threads.ThreadPool; 49 import org.apache.tomcat.util.threads.ThreadPoolRunnable; 50 51 52 54 55 60 61 68 69 70 80 public class ChannelSocket extends JkHandler 81 implements NotificationBroadcaster , JkChannel { 82 private static org.apache.commons.logging.Log log= 83 org.apache.commons.logging.LogFactory.getLog( ChannelSocket.class ); 84 85 int startPort=8009; 86 int maxPort=8019; int port=startPort; 88 InetAddress inet; 89 int serverTimeout; 90 boolean tcpNoDelay=true; int linger=100; 92 int socketTimeout; 93 94 long requestCount=0; 95 96 101 static final boolean BUFFER_WRITE=false; 102 103 ThreadPool tp=ThreadPool.createThreadPool(true); 104 105 106 107 110 public ChannelSocket() { 111 } 113 114 public ThreadPool getThreadPool() { 115 return tp; 116 } 117 118 public long getRequestCount() { 119 return requestCount; 120 } 121 122 131 public void setPort( int port ) { 132 this.startPort=port; 133 this.port=port; 134 this.maxPort=port+10; 135 } 136 137 public int getPort() { 138 return port; 139 } 140 141 public void setAddress(InetAddress inet) { 142 this.inet=inet; 143 } 144 145 148 public void setAddress(String inet) { 149 try { 150 this.inet= InetAddress.getByName( inet ); 151 } catch( Exception ex ) { 152 log.error("Error parsing "+inet,ex); 153 } 154 } 155 156 public String getAddress() { 157 if( inet!=null) 158 return inet.toString(); 159 return "/0.0.0.0"; 160 } 161 162 170 public void setServerTimeout(int timeout) { 171 this.serverTimeout = timeout; 172 } 173 public int getServerTimeout() { 174 return serverTimeout; 175 } 176 177 public void setTcpNoDelay( boolean b ) { 178 tcpNoDelay=b; 179 } 180 181 public boolean getTcpNoDelay() { 182 return tcpNoDelay; 183 } 184 185 public void setSoLinger( int i ) { 186 linger=i; 187 } 188 189 public int getSoLinger() { 190 return linger; 191 } 192 193 public void setSoTimeout( int i ) { 194 socketTimeout=i; 195 } 196 197 public int getSoTimeout() { 198 return socketTimeout; 199 } 200 201 public void setMaxPort( int i ) { 202 maxPort=i; 203 } 204 205 public int getMaxPort() { 206 return maxPort; 207 } 208 209 214 public int getInstanceId() { 215 return port-startPort; 216 } 217 218 221 public void setDaemon( boolean b ) { 222 tp.setDaemon( b ); 223 } 224 225 public boolean getDaemon() { 226 return tp.getDaemon(); 227 } 228 229 230 public void setMaxThreads( int i ) { 231 if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i); 232 tp.setMaxThreads(i); 233 } 234 235 public void setMinSpareThreads( int i ) { 236 if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i); 237 tp.setMinSpareThreads(i); 238 } 239 240 public void setMaxSpareThreads( int i ) { 241 if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i); 242 tp.setMaxSpareThreads(i); 243 } 244 245 public int getMaxThreads() { 246 return tp.getMaxThreads(); 247 } 248 249 public int getMinSpareThreads() { 250 return tp.getMinSpareThreads(); 251 } 252 253 public int getMaxSpareThreads() { 254 return tp.getMaxSpareThreads(); 255 } 256 257 public void setBacklog(int i) { 258 } 259 260 261 262 ServerSocket sSocket; 263 final int socketNote=1; 264 final int isNote=2; 265 final int osNote=3; 266 final int notifNote=4; 267 boolean paused = false; 268 269 public void pause() throws Exception { 270 synchronized(this) { 271 paused = true; 272 unLockSocket(); 273 } 274 } 275 276 public void resume() throws Exception { 277 synchronized(this) { 278 paused = false; 279 notify(); 280 } 281 } 282 283 284 public void accept( MsgContext ep ) throws IOException { 285 if( sSocket==null ) return; 286 synchronized(this) { 287 while(paused) { 288 try{ 289 wait(); 290 } catch(InterruptedException ie) { 291 } 293 } 294 } 295 Socket s=sSocket.accept(); 296 ep.setNote( socketNote, s ); 297 if(log.isDebugEnabled() ) 298 log.debug("Accepted socket " + s ); 299 if( linger > 0 ) 300 s.setSoLinger( true, linger); 301 if( socketTimeout > 0 ) 302 s.setSoTimeout( socketTimeout ); 303 304 s.setTcpNoDelay( tcpNoDelay ); 306 requestCount++; 307 308 InputStream is=new BufferedInputStream (s.getInputStream()); 309 OutputStream os; 310 if( BUFFER_WRITE ) 311 os = new BufferedOutputStream ( s.getOutputStream()); 312 else 313 os = s.getOutputStream(); 314 ep.setNote( isNote, is ); 315 ep.setNote( osNote, os ); 316 ep.setControl( tp ); 317 } 318 319 public void resetCounters() { 320 requestCount=0; 321 } 322 323 326 public void reinit() throws IOException { 327 destroy(); 328 init(); 329 } 330 331 334 public void init() throws IOException { 335 if (startPort == 0) { 337 port = 0; 338 if(log.isInfoEnabled()) 339 log.info("JK: ajp13 disabling channelSocket"); 340 running = true; 341 return; 342 } 343 if (maxPort < startPort) 344 maxPort = startPort; 345 for( int i=startPort; i<=maxPort; i++ ) { 346 try { 347 if( inet == null ) { 348 sSocket = new ServerSocket ( i, 0 ); 349 } else { 350 sSocket=new ServerSocket ( i, 0, inet ); 351 } 352 port=i; 353 break; 354 } catch( IOException ex ) { 355 if(log.isInfoEnabled()) 356 log.info("Port busy " + i + " " + ex.toString()); 357 continue; 358 } 359 } 360 361 if( sSocket==null ) { 362 log.error("Can't find free port " + startPort + " " + maxPort ); 363 return; 364 } 365 if(log.isInfoEnabled()) 366 log.info("JK: ajp13 listening on " + getAddress() + ":" + port ); 367 368 if( "channelSocket".equals( name ) && 371 port != startPort && 372 (wEnv.getLocalId()==0) ) { 373 wEnv.setLocalId( port - startPort ); 374 } 375 if( serverTimeout > 0 ) 376 sSocket.setSoTimeout( serverTimeout ); 377 378 if( next==null && wEnv!=null ) { 380 if( nextName!=null ) 381 setNext( wEnv.getHandler( nextName ) ); 382 if( next==null ) 383 next=wEnv.getHandler( "dispatch" ); 384 if( next==null ) 385 next=wEnv.getHandler( "request" ); 386 } 387 JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote"); 388 running = true; 389 390 if( this.domain != null ) { 393 try { 394 tpOName=new ObjectName (domain + ":type=ThreadPool,name=" + 395 getChannelName()); 396 397 Registry.getRegistry(null, null) 398 .registerComponent(tp, tpOName, null); 399 400 rgOName = new ObjectName 401 (domain+":type=GlobalRequestProcessor,name=" + getChannelName()); 402 Registry.getRegistry(null, null) 403 .registerComponent(global, rgOName, null); 404 } catch (Exception e) { 405 log.error("Can't register threadpool" ); 406 } 407 } 408 409 tp.start(); 410 SocketAcceptor acceptAjp=new SocketAcceptor( this ); 411 tp.runIt( acceptAjp); 412 413 } 414 415 ObjectName tpOName; 416 ObjectName rgOName; 417 RequestGroupInfo global=new RequestGroupInfo(); 418 int JMXRequestNote; 419 420 public void start() throws IOException { 421 if( sSocket==null ) 422 init(); 423 } 424 425 public void stop() throws IOException { 426 destroy(); 427 } 428 429 public void registerRequest(Request req, MsgContext ep, int count) { 430 if(this.domain != null) { 431 try { 432 RequestInfo rp=req.getRequestProcessor(); 433 rp.setGlobalProcessor(global); 434 ObjectName roname = new ObjectName 435 (getDomain() + ":type=RequestProcessor,worker="+ 436 getChannelName()+",name=JkRequest" +count); 437 ep.setNote(JMXRequestNote, roname); 438 439 Registry.getRegistry(null, null).registerComponent( rp, roname, null); 440 } catch( Exception ex ) { 441 log.warn("Error registering request"); 442 } 443 } 444 } 445 446 public void open(MsgContext ep) throws IOException { 447 } 448 449 450 public void close(MsgContext ep) throws IOException { 451 Socket s=(Socket )ep.getNote( socketNote ); 452 s.close(); 453 } 454 455 private void unLockSocket() throws IOException { 456 Socket s; 458 InetAddress ladr = inet; 459 460 if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) { 461 ladr = InetAddress.getLocalHost(); 462 } 463 s=new Socket (ladr, port ); 464 s.setSoLinger(true, 0); 467 468 s.close(); 469 } 470 471 public void destroy() throws IOException { 472 running = false; 473 try { 474 475 if (port == 0) 476 return; 477 tp.shutdown(); 478 479 if(!paused) { 480 unLockSocket(); 481 } 482 483 sSocket.close(); 485 if( tpOName != null ) { 486 Registry.getRegistry(null, null).unregisterComponent(tpOName); 487 } 488 if( rgOName != null ) { 489 Registry.getRegistry(null, null).unregisterComponent(rgOName); 490 } 491 } catch(Exception e) { 492 log.info("Error shutting down the channel " + port + " " + 493 e.toString()); 494 if( log.isDebugEnabled() ) log.debug("Trace", e); 495 } 496 } 497 498 public int send( Msg msg, MsgContext ep) 499 throws IOException 500 { 501 msg.end(); byte buf[]=msg.getBuffer(); 503 int len=msg.getLen(); 504 505 if(log.isTraceEnabled() ) 506 log.trace("send() " + len + " " + buf[4] ); 507 508 OutputStream os=(OutputStream )ep.getNote( osNote ); 509 os.write( buf, 0, len ); 510 return len; 511 } 512 513 public int flush( Msg msg, MsgContext ep) 514 throws IOException 515 { 516 if( BUFFER_WRITE ) { 517 OutputStream os=(OutputStream )ep.getNote( osNote ); 518 os.flush(); 519 } 520 return 0; 521 } 522 523 public int receive( Msg msg, MsgContext ep ) 524 throws IOException 525 { 526 if (log.isDebugEnabled()) { 527 log.debug("receive() "); 528 } 529 530 byte buf[]=msg.getBuffer(); 531 int hlen=msg.getHeaderLength(); 532 533 538 int rd = this.read(ep, buf, 0, hlen ); 539 540 if(rd < 0) { 541 return rd; 544 } 545 546 msg.processHeader(); 547 548 551 int blen=msg.getLen(); 552 553 555 int total_read = 0; 556 557 total_read = this.read(ep, buf, hlen, blen); 558 559 if ((total_read <= 0) && (blen > 0)) { 560 log.warn("can't read body, waited #" + blen); 561 return -1; 562 } 563 564 if (total_read != blen) { 565 log.warn( "incomplete read, waited #" + blen + 566 " got only " + total_read); 567 return -2; 568 } 569 570 return total_read; 571 } 572 573 592 public int read( MsgContext ep, byte[] b, int offset, int len) 593 throws IOException 594 { 595 InputStream is=(InputStream )ep.getNote( isNote ); 596 int pos = 0; 597 int got; 598 599 while(pos < len) { 600 try { 601 got = is.read(b, pos + offset, len - pos); 602 } catch(SocketException sex) { 603 if(pos > 0) { 604 log.info("Error reading data after "+pos+"bytes",sex); 605 } else { 606 log.debug("Error reading data", sex); 607 } 608 got = -1; 609 } 610 if (log.isTraceEnabled()) { 611 log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " + 612 offset + " " + len + " = " + got ); 613 } 614 615 if (got <= 0) { 617 return -3; 622 } 623 624 pos += got; 625 } 626 return pos; 627 } 628 629 protected boolean running=true; 630 631 633 void acceptConnections() { 634 if( log.isDebugEnabled() ) 635 log.debug("Accepting ajp connections on " + port); 636 while( running ) { 637 try{ 638 MsgContext ep=new MsgContext(); 639 ep.setSource(this); 640 ep.setWorkerEnv( wEnv ); 641 this.accept(ep); 642 643 if( !running ) break; 644 645 SocketConnection ajpConn= 648 new SocketConnection(this, ep); 649 tp.runIt( ajpConn ); 650 }catch(Exception ex) { 651 if (running) 652 log.warn("Exception executing accept" ,ex); 653 } 654 } 655 } 656 657 659 void processConnection(MsgContext ep) { 660 try { 661 MsgAjp recv=new MsgAjp(); 662 while( running ) { 663 if(paused) { break; 665 } 666 int status= this.receive( recv, ep ); 667 if( status <= 0 ) { 668 if( status==-3) 669 log.debug( "server has been restarted or reset this connection" ); 670 else 671 log.warn("Closing ajp connection " + status ); 672 break; 673 } 674 ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis()); 675 676 ep.setType( 0 ); 677 status= this.invoke( recv, ep ); 679 if( status!= JkHandler.OK ) { 680 log.warn("processCallbacks status " + status ); 681 break; 682 } 683 } 684 } catch( Exception ex ) { 685 String msg = ex.getMessage(); 686 if( msg != null && msg.indexOf( "Connection reset" ) >= 0) 687 log.debug( "Server has been restarted or reset this connection"); 688 else if (msg != null && msg.indexOf( "Read timed out" ) >=0 ) 689 log.info( "connection timeout reached"); 690 else 691 log.error( "Error, processing connection", ex); 692 } finally { 693 699 try { 700 this.close( ep ); 701 } 702 catch( Exception e) { 703 log.error( "Error, closing connection", e); 704 } 705 try{ 706 Request req = (Request)ep.getRequest(); 707 if( req != null ) { 708 ObjectName roname = (ObjectName )ep.getNote(JMXRequestNote); 709 if( roname != null ) { 710 Registry.getRegistry(null, null).unregisterComponent(roname); 711 } 712 req.getRequestProcessor().setGlobalProcessor(null); 713 } 714 } catch( Exception ee) { 715 log.error( "Error, releasing connection",ee); 716 } 717 } 718 } 719 720 public int invoke( Msg msg, MsgContext ep ) throws IOException { 722 int type=ep.getType(); 723 724 switch( type ) { 725 case JkHandler.HANDLE_RECEIVE_PACKET: 726 if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? "); 727 return receive( msg, ep ); 728 case JkHandler.HANDLE_SEND_PACKET: 729 return send( msg, ep ); 730 case JkHandler.HANDLE_FLUSH: 731 return flush( msg, ep ); 732 } 733 734 if( log.isDebugEnabled() ) 735 log.debug("Call next " + type + " " + next); 736 737 if( nSupport!=null ) { 739 Notification notif=(Notification )ep.getNote(notifNote); 740 if( notif==null ) { 741 notif=new Notification ("channelSocket.message", ep, requestCount ); 742 ep.setNote( notifNote, notif); 743 } 744 nSupport.sendNotification(notif); 745 } 746 747 if( next != null ) { 748 return next.invoke( msg, ep ); 749 } else { 750 log.info("No next "); 751 } 752 753 return OK; 754 } 755 756 public boolean isSameAddress(MsgContext ep) { 757 Socket s=(Socket )ep.getNote( socketNote ); 758 return isSameAddress( s.getLocalAddress(), s.getInetAddress()); 759 } 760 761 public String getChannelName() { 762 String encodedAddr = ""; 763 if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) { 764 encodedAddr = getAddress(); 765 if (encodedAddr.startsWith("/")) 766 encodedAddr = encodedAddr.substring(1); 767 encodedAddr = URLEncoder.encode(encodedAddr) + "-"; 768 } 769 return ("jk-" + encodedAddr + port); 770 } 771 772 781 public static boolean isSameAddress(InetAddress server, InetAddress client) 782 { 783 byte serverAddr[] = server.getAddress(); 785 byte clientAddr[] = client.getAddress(); 786 if (serverAddr.length != clientAddr.length) 787 return (false); 788 boolean match = true; 789 for (int i = 0; i < serverAddr.length; i++) { 790 if (serverAddr[i] != clientAddr[i]) { 791 match = false; 792 break; 793 } 794 } 795 if (match) 796 return (true); 797 798 for (int i = 0; i < serverAddr.length; i++) { 800 if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i]) 801 return (false); 802 } 803 return (true); 804 } 805 806 public void sendNewMessageNotification(Notification notification) { 807 if( nSupport!= null ) 808 nSupport.sendNotification(notification); 809 } 810 811 private NotificationBroadcasterSupport nSupport= null; 812 813 public void addNotificationListener(NotificationListener listener, 814 NotificationFilter filter, 815 Object handback) 816 throws IllegalArgumentException 817 { 818 if( nSupport==null ) nSupport=new NotificationBroadcasterSupport (); 819 nSupport.addNotificationListener(listener, filter, handback); 820 } 821 822 public void removeNotificationListener(NotificationListener listener) 823 throws ListenerNotFoundException 824 { 825 if( nSupport!=null) 826 nSupport.removeNotificationListener(listener); 827 } 828 829 MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo [0]; 830 831 public void setNotificationInfo( MBeanNotificationInfo info[]) { 832 this.notifInfo=info; 833 } 834 835 public MBeanNotificationInfo [] getNotificationInfo() { 836 return notifInfo; 837 } 838 } 839 840 class SocketAcceptor implements ThreadPoolRunnable { 841 ChannelSocket wajp; 842 843 SocketAcceptor(ChannelSocket wajp ) { 844 this.wajp=wajp; 845 } 846 847 public Object [] getInitData() { 848 return null; 849 } 850 851 public void runIt(Object thD[]) { 852 wajp.acceptConnections(); 853 } 854 } 855 856 class SocketConnection implements ThreadPoolRunnable { 857 ChannelSocket wajp; 858 MsgContext ep; 859 860 SocketConnection(ChannelSocket wajp, MsgContext ep) { 861 this.wajp=wajp; 862 this.ep=ep; 863 } 864 865 866 public Object [] getInitData() { 867 return null; 868 } 869 870 public void runIt(Object perTh[]) { 871 wajp.processConnection(ep); 872 ep = null; 873 } 874 } 875 | Popular Tags |