1 16 17 package org.apache.jk.common; 18 19 import java.net.URLEncoder ; 20 import java.io.File ; 21 import java.io.FileOutputStream ; 22 import java.io.IOException ; 23 import javax.management.ObjectName ; 24 25 import org.apache.commons.modeler.Registry; 26 import org.apache.jk.core.JkHandler; 27 import org.apache.jk.core.Msg; 28 import org.apache.jk.core.MsgContext; 29 import org.apache.jk.core.JkChannel; 30 import org.apache.jk.core.WorkerEnv; 31 import org.apache.coyote.Request; 32 import org.apache.coyote.RequestGroupInfo; 33 import org.apache.coyote.RequestInfo; 34 import org.apache.tomcat.util.threads.ThreadPool; 35 import org.apache.tomcat.util.threads.ThreadPoolRunnable; 36 37 38 42 public class ChannelUn extends JniHandler implements JkChannel { 43 static final int CH_OPEN=4; 44 static final int CH_CLOSE=5; 45 static final int CH_READ=6; 46 static final int CH_WRITE=7; 47 48 String file; 49 ThreadPool tp = ThreadPool.createThreadPool(true); 50 51 52 53 public ThreadPool getThreadPool() { 54 return tp; 55 } 56 57 public void setFile( String f ) { 58 file=f; 59 } 60 61 public String getFile() { 62 return file; 63 } 64 65 66 int socketNote=1; 67 int isNote=2; 68 int osNote=3; 69 70 int localId=0; 71 72 public void init() throws IOException { 73 if( file==null ) { 74 log.debug("No file, disabling unix channel"); 75 return; 76 } 78 if( wEnv!=null && wEnv.getLocalId() != 0 ) { 79 localId=wEnv.getLocalId(); 80 } 81 82 if( localId != 0 ) { 83 file=file+ localId; 84 } 85 File socketFile=new File ( file ); 86 if( !socketFile.isAbsolute() ) { 87 String home=wEnv.getJkHome(); 88 if( home==null ) { 89 log.debug("No jkhome"); 90 } else { 91 File homef=new File ( home ); 92 socketFile=new File ( homef, file ); 93 log.debug( "Making the file absolute " +socketFile); 94 } 95 } 96 97 if( ! socketFile.exists() ) { 98 try { 99 FileOutputStream fos=new FileOutputStream (socketFile); 100 fos.write( 1 ); 101 fos.close(); 102 } catch( Throwable t ) { 103 log.error("Attempting to create the file failed, disabling channel" 104 + socketFile); 105 return; 106 } 107 } 108 if (!socketFile.delete()) { 110 log.error( "Can't remove socket file " + socketFile); 111 return; 112 } 113 114 115 super.initNative( "channel.un:" + file ); 116 117 if( apr==null || ! apr.isLoaded() ) { 118 log.debug("Apr is not available, disabling unix channel "); 119 apr=null; 120 return; 121 } 122 123 setNativeAttribute( "file", file ); 125 127 setNativeAttribute( "listen", "10" ); 128 130 if( next==null && wEnv!=null ) { 132 if( nextName!=null ) 133 setNext( wEnv.getHandler( nextName ) ); 134 if( next==null ) 135 next=wEnv.getHandler( "dispatch" ); 136 if( next==null ) 137 next=wEnv.getHandler( "request" ); 138 } 139 140 super.initJkComponent(); 141 JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote"); 142 if( this.domain != null ) { 144 try { 145 tpOName=new ObjectName (domain + ":type=ThreadPool,name=" + 146 getChannelName()); 147 148 Registry.getRegistry(null, null) 149 .registerComponent(tp, tpOName, null); 150 151 rgOName = new ObjectName 152 (domain+":type=GlobalRequestProcessor,name=" + getChannelName()); 153 Registry.getRegistry(null, null) 154 .registerComponent(global, rgOName, null); 155 } catch (Exception e) { 156 log.error("Can't register threadpool" ); 157 } 158 } 159 tp.start(); 160 AprAcceptor acceptAjp=new AprAcceptor( this ); 161 tp.runIt( acceptAjp); 162 log.info("JK: listening on unix socket: " + file ); 163 164 } 165 166 ObjectName tpOName; 167 ObjectName rgOName; 168 RequestGroupInfo global=new RequestGroupInfo(); 169 int count = 0; 170 int JMXRequestNote; 171 172 public void start() throws IOException { 173 } 174 175 public void destroy() throws IOException { 176 if( apr==null ) return; 177 try { 178 if( tp != null ) 179 tp.shutdown(); 180 181 super.destroyJkComponent(); 183 184 if(tpOName != null) { 185 Registry.getRegistry(null, null).unregisterComponent(tpOName); 186 } 187 if(rgOName != null) { 188 Registry.getRegistry(null, null).unregisterComponent(rgOName); 189 } 190 } catch(Exception e) { 191 log.error("Error in destroy",e); 192 } 193 } 194 195 public void registerRequest(Request req, MsgContext ep, int count) { 196 if(this.domain != null) { 197 try { 198 199 RequestInfo rp=req.getRequestProcessor(); 200 rp.setGlobalProcessor(global); 201 ObjectName roname = new ObjectName 202 (getDomain() + ":type=RequestProcessor,worker="+ 203 getChannelName()+",name=JkRequest" +count); 204 ep.setNote(JMXRequestNote, roname); 205 206 Registry.getRegistry(null, null).registerComponent( rp, roname, null); 207 } catch( Exception ex ) { 208 log.warn("Error registering request"); 209 } 210 } 211 } 212 213 214 217 public int open(MsgContext ep) throws IOException { 218 return super.nativeDispatch( ep.getMsg(0), ep, CH_OPEN, 1 ); 222 } 223 224 public void close(MsgContext ep) throws IOException { 225 super.nativeDispatch( ep.getMsg(0), ep, CH_CLOSE, 1 ); 226 } 227 228 public int send( Msg msg, MsgContext ep) 229 throws IOException 230 { 231 return super.nativeDispatch( msg, ep, CH_WRITE, 0 ); 232 } 233 234 public int receive( Msg msg, MsgContext ep ) 235 throws IOException 236 { 237 int rc=super.nativeDispatch( msg, ep, CH_READ, 1 ); 238 239 if( rc!=0 ) { 240 log.error("receive error: " + rc, new Throwable ()); 241 return -1; 242 } 243 244 msg.processHeader(); 245 246 if (log.isDebugEnabled()) 247 log.debug("receive: total read = " + msg.getLen()); 248 249 return msg.getLen(); 250 } 251 252 public int flush( Msg msg, MsgContext ep) throws IOException { 253 return OK; 254 } 255 256 public boolean isSameAddress( MsgContext ep ) { 257 return false; } 259 260 boolean running=true; 261 262 264 void acceptConnections() { 265 if( apr==null ) return; 266 267 if( log.isDebugEnabled() ) 268 log.debug("Accepting ajp connections on " + file); 269 270 while( running ) { 271 try { 272 MsgContext ep=this.createMsgContext(); 273 274 int status=this.open(ep); 276 if( status != 0 && status != 2 ) { 277 log.error( "Error acceptin connection on " + file ); 278 break; 279 } 280 281 284 AprConnection ajpConn= new AprConnection(this, ep); 285 tp.runIt( ajpConn ); 286 } catch( Exception ex ) { 287 ex.printStackTrace(); 288 } 289 } 290 } 291 292 294 void processConnection(MsgContext ep) { 295 if( log.isDebugEnabled() ) 296 log.debug( "New ajp connection "); 297 try { 298 MsgAjp recv=new MsgAjp(); 299 while( running ) { 300 int res=this.receive( recv, ep ); 301 if( res<0 ) { 302 break; 304 } 305 ep.setType(0); 306 log.debug( "Process msg "); 307 int status=next.invoke( recv, ep ); 308 } 309 if( log.isDebugEnabled() ) 310 log.debug( "Closing un channel"); 311 try{ 312 Request req = (Request)ep.getRequest(); 313 if( req != null ) { 314 ObjectName roname = (ObjectName )ep.getNote(JMXRequestNote); 315 if( roname != null ) { 316 Registry.getRegistry(null, null).unregisterComponent(roname); 317 } 318 req.getRequestProcessor().setGlobalProcessor(null); 319 } 320 } catch( Exception ee) { 321 log.error( "Error, releasing connection",ee); 322 } 323 this.close( ep ); 324 } catch( Exception ex ) { 325 ex.printStackTrace(); 326 } 327 } 328 329 public int invoke( Msg msg, MsgContext ep ) throws IOException { 330 int type=ep.getType(); 331 332 switch( type ) { 333 case JkHandler.HANDLE_RECEIVE_PACKET: 334 return receive( msg, ep ); 335 case JkHandler.HANDLE_SEND_PACKET: 336 return send( msg, ep ); 337 case JkHandler.HANDLE_FLUSH: 338 return flush( msg, ep ); 339 } 340 341 return OK; 343 } 344 345 public String getChannelName() { 346 String encodedAddr = ""; 347 String address = file; 348 if (address != null) { 349 encodedAddr = "" + address; 350 if (encodedAddr.startsWith("/")) 351 encodedAddr = encodedAddr.substring(1); 352 encodedAddr = URLEncoder.encode(encodedAddr) ; 353 } 354 return ("jk-" + encodedAddr); 355 } 356 357 private static org.apache.commons.logging.Log log= 358 org.apache.commons.logging.LogFactory.getLog( ChannelUn.class ); 359 } 360 361 class AprAcceptor implements ThreadPoolRunnable { 362 ChannelUn wajp; 363 364 AprAcceptor(ChannelUn wajp ) { 365 this.wajp=wajp; 366 } 367 368 public Object [] getInitData() { 369 return null; 370 } 371 372 public void runIt(Object thD[]) { 373 wajp.acceptConnections(); 374 } 375 } 376 377 class AprConnection implements ThreadPoolRunnable { 378 ChannelUn wajp; 379 MsgContext ep; 380 381 AprConnection(ChannelUn wajp, MsgContext ep) { 382 this.wajp=wajp; 383 this.ep=ep; 384 } 385 386 387 public Object [] getInitData() { 388 return null; 389 } 390 391 public void runIt(Object perTh[]) { 392 wajp.processConnection(ep); 393 } 394 } 395 | Popular Tags |