KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > jk > common > ChannelUn


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.jk.common;
18
19 import java.net.URLEncoder JavaDoc;
20 import java.io.File JavaDoc;
21 import java.io.FileOutputStream JavaDoc;
22 import java.io.IOException JavaDoc;
23 import javax.management.ObjectName JavaDoc;
24
25 import org.apache.commons.modeler.Registry;
26 import org.apache.jk.core.JkHandler;
27 import org.apache.jk.core.Msg;
28 import org.apache.jk.core.MsgContext;
29 import org.apache.jk.core.JkChannel;
30 import org.apache.jk.core.WorkerEnv;
31 import org.apache.coyote.Request;
32 import org.apache.coyote.RequestGroupInfo;
33 import org.apache.coyote.RequestInfo;
34 import org.apache.tomcat.util.threads.ThreadPool;
35 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
36
37
38 /** Pass messages using unix domain sockets.
39  *
40  * @author Costin Manolache
41  */

42 public class ChannelUn extends JniHandler implements JkChannel {
43     static final int CH_OPEN=4;
44     static final int CH_CLOSE=5;
45     static final int CH_READ=6;
46     static final int CH_WRITE=7;
47
48     String JavaDoc file;
49     ThreadPool tp = ThreadPool.createThreadPool(true);
50
51     /* ==================== Tcp socket options ==================== */
52
53     public ThreadPool getThreadPool() {
54         return tp;
55     }
56     
57     public void setFile( String JavaDoc f ) {
58         file=f;
59     }
60     
61     public String JavaDoc getFile() {
62         return file;
63     }
64     
65     /* ==================== ==================== */
66     int socketNote=1;
67     int isNote=2;
68     int osNote=3;
69     
70     int localId=0;
71     
72     public void init() throws IOException JavaDoc {
73         if( file==null ) {
74             log.debug("No file, disabling unix channel");
75             return;
76             //throw new IOException( "No file for the unix socket channel");
77
}
78         if( wEnv!=null && wEnv.getLocalId() != 0 ) {
79             localId=wEnv.getLocalId();
80         }
81
82         if( localId != 0 ) {
83             file=file+ localId;
84         }
85         File JavaDoc socketFile=new File JavaDoc( file );
86         if( !socketFile.isAbsolute() ) {
87             String JavaDoc home=wEnv.getJkHome();
88             if( home==null ) {
89                 log.debug("No jkhome");
90             } else {
91                 File JavaDoc homef=new File JavaDoc( home );
92                 socketFile=new File JavaDoc( homef, file );
93                 log.debug( "Making the file absolute " +socketFile);
94             }
95         }
96         
97         if( ! socketFile.exists() ) {
98             try {
99                 FileOutputStream JavaDoc fos=new FileOutputStream JavaDoc(socketFile);
100                 fos.write( 1 );
101                 fos.close();
102             } catch( Throwable JavaDoc t ) {
103                 log.error("Attempting to create the file failed, disabling channel"
104                         + socketFile);
105                 return;
106             }
107         }
108         // The socket file cannot be removed ...
109
if (!socketFile.delete()) {
110             log.error( "Can't remove socket file " + socketFile);
111             return;
112         }
113         
114
115         super.initNative( "channel.un:" + file );
116
117         if( apr==null || ! apr.isLoaded() ) {
118             log.debug("Apr is not available, disabling unix channel ");
119             apr=null;
120             return;
121         }
122         
123         // Set properties and call init.
124
setNativeAttribute( "file", file );
125         // unixListenSocket=apr.unSocketListen( file, 10 );
126

127         setNativeAttribute( "listen", "10" );
128         // setNativeAttribute( "debug", "10" );
129

130         // Initialize the thread pool and execution chain
131
if( next==null && wEnv!=null ) {
132             if( nextName!=null )
133                 setNext( wEnv.getHandler( nextName ) );
134             if( next==null )
135                 next=wEnv.getHandler( "dispatch" );
136             if( next==null )
137                 next=wEnv.getHandler( "request" );
138         }
139
140         super.initJkComponent();
141         JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
142         // Run a thread that will accept connections.
143
if( this.domain != null ) {
144             try {
145                 tpOName=new ObjectName JavaDoc(domain + ":type=ThreadPool,name=" +
146                        getChannelName());
147
148                 Registry.getRegistry(null, null)
149             .registerComponent(tp, tpOName, null);
150
151         rgOName = new ObjectName JavaDoc
152             (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
153         Registry.getRegistry(null, null)
154             .registerComponent(global, rgOName, null);
155             } catch (Exception JavaDoc e) {
156                 log.error("Can't register threadpool" );
157             }
158         }
159         tp.start();
160         AprAcceptor acceptAjp=new AprAcceptor( this );
161         tp.runIt( acceptAjp);
162         log.info("JK: listening on unix socket: " + file );
163         
164     }
165
166     ObjectName JavaDoc tpOName;
167     ObjectName JavaDoc rgOName;
168     RequestGroupInfo global=new RequestGroupInfo();
169     int count = 0;
170     int JMXRequestNote;
171
172     public void start() throws IOException JavaDoc {
173     }
174
175     public void destroy() throws IOException JavaDoc {
176         if( apr==null ) return;
177         try {
178             if( tp != null )
179                 tp.shutdown();
180             
181             //apr.unSocketClose( unixListenSocket,3);
182
super.destroyJkComponent();
183
184             if(tpOName != null) {
185         Registry.getRegistry(null, null).unregisterComponent(tpOName);
186         }
187         if(rgOName != null) {
188         Registry.getRegistry(null, null).unregisterComponent(rgOName);
189         }
190         } catch(Exception JavaDoc e) {
191             log.error("Error in destroy",e);
192         }
193     }
194
195     public void registerRequest(Request req, MsgContext ep, int count) {
196     if(this.domain != null) {
197         try {
198
199         RequestInfo rp=req.getRequestProcessor();
200         rp.setGlobalProcessor(global);
201         ObjectName JavaDoc roname = new ObjectName JavaDoc
202             (getDomain() + ":type=RequestProcessor,worker="+
203              getChannelName()+",name=JkRequest" +count);
204         ep.setNote(JMXRequestNote, roname);
205                         
206         Registry.getRegistry(null, null).registerComponent( rp, roname, null);
207         } catch( Exception JavaDoc ex ) {
208         log.warn("Error registering request");
209         }
210     }
211     }
212
213
214     /** Open a connection - since we're listening that will block in
215         accept
216     */

217     public int open(MsgContext ep) throws IOException JavaDoc {
218         // Will associate a jk_endpoint with ep and call open() on it.
219
// jk_channel_un will accept a connection and set the socket info
220
// in the endpoint. MsgContext will represent an active connection.
221
return super.nativeDispatch( ep.getMsg(0), ep, CH_OPEN, 1 );
222     }
223     
224     public void close(MsgContext ep) throws IOException JavaDoc {
225         super.nativeDispatch( ep.getMsg(0), ep, CH_CLOSE, 1 );
226     }
227
228     public int send( Msg msg, MsgContext ep)
229         throws IOException JavaDoc
230     {
231         return super.nativeDispatch( msg, ep, CH_WRITE, 0 );
232     }
233
234     public int receive( Msg msg, MsgContext ep )
235         throws IOException JavaDoc
236     {
237         int rc=super.nativeDispatch( msg, ep, CH_READ, 1 );
238
239         if( rc!=0 ) {
240             log.error("receive error: " + rc, new Throwable JavaDoc());
241             return -1;
242         }
243         
244         msg.processHeader();
245         
246         if (log.isDebugEnabled())
247              log.debug("receive: total read = " + msg.getLen());
248
249     return msg.getLen();
250     }
251
252     public int flush( Msg msg, MsgContext ep) throws IOException JavaDoc {
253     return OK;
254     }
255
256     public boolean isSameAddress( MsgContext ep ) {
257     return false; // Not supporting shutdown on this channel.
258
}
259
260     boolean running=true;
261     
262     /** Accept incoming connections, dispatch to the thread pool
263      */

264     void acceptConnections() {
265         if( apr==null ) return;
266
267         if( log.isDebugEnabled() )
268             log.debug("Accepting ajp connections on " + file);
269         
270         while( running ) {
271             try {
272                 MsgContext ep=this.createMsgContext();
273
274                 // blocking - opening a server connection.
275
int status=this.open(ep);
276                 if( status != 0 && status != 2 ) {
277                     log.error( "Error acceptin connection on " + file );
278                     break;
279                 }
280
281                 // if( log.isDebugEnabled() )
282
// log.debug("Accepted ajp connections ");
283

284                 AprConnection ajpConn= new AprConnection(this, ep);
285                 tp.runIt( ajpConn );
286             } catch( Exception JavaDoc ex ) {
287                 ex.printStackTrace();
288             }
289         }
290     }
291
292     /** Process a single ajp connection.
293      */

294     void processConnection(MsgContext ep) {
295         if( log.isDebugEnabled() )
296             log.debug( "New ajp connection ");
297         try {
298             MsgAjp recv=new MsgAjp();
299             while( running ) {
300                 int res=this.receive( recv, ep );
301                 if( res<0 ) {
302                     // EOS
303
break;
304                 }
305                 ep.setType(0);
306                 log.debug( "Process msg ");
307                 int status=next.invoke( recv, ep );
308             }
309             if( log.isDebugEnabled() )
310                 log.debug( "Closing un channel");
311             try{
312                 Request req = (Request)ep.getRequest();
313                 if( req != null ) {
314                     ObjectName JavaDoc roname = (ObjectName JavaDoc)ep.getNote(JMXRequestNote);
315                     if( roname != null ) {
316                         Registry.getRegistry(null, null).unregisterComponent(roname);
317                     }
318                     req.getRequestProcessor().setGlobalProcessor(null);
319                 }
320             } catch( Exception JavaDoc ee) {
321                 log.error( "Error, releasing connection",ee);
322             }
323             this.close( ep );
324         } catch( Exception JavaDoc ex ) {
325             ex.printStackTrace();
326         }
327     }
328
329     public int invoke( Msg msg, MsgContext ep ) throws IOException JavaDoc {
330         int type=ep.getType();
331
332         switch( type ) {
333         case JkHandler.HANDLE_RECEIVE_PACKET:
334             return receive( msg, ep );
335         case JkHandler.HANDLE_SEND_PACKET:
336             return send( msg, ep );
337         case JkHandler.HANDLE_FLUSH:
338             return flush( msg, ep );
339         }
340
341         // return next.invoke( msg, ep );
342
return OK;
343     }
344
345     public String JavaDoc getChannelName() {
346         String JavaDoc encodedAddr = "";
347         String JavaDoc address = file;
348         if (address != null) {
349             encodedAddr = "" + address;
350             if (encodedAddr.startsWith("/"))
351                 encodedAddr = encodedAddr.substring(1);
352             encodedAddr = URLEncoder.encode(encodedAddr) ;
353         }
354         return ("jk-" + encodedAddr);
355     }
356
357     private static org.apache.commons.logging.Log log=
358         org.apache.commons.logging.LogFactory.getLog( ChannelUn.class );
359 }
360
361 class AprAcceptor implements ThreadPoolRunnable {
362     ChannelUn wajp;
363     
364     AprAcceptor(ChannelUn wajp ) {
365         this.wajp=wajp;
366     }
367
368     public Object JavaDoc[] getInitData() {
369         return null;
370     }
371
372     public void runIt(Object JavaDoc thD[]) {
373         wajp.acceptConnections();
374     }
375 }
376
377 class AprConnection implements ThreadPoolRunnable {
378     ChannelUn wajp;
379     MsgContext ep;
380
381     AprConnection(ChannelUn wajp, MsgContext ep) {
382         this.wajp=wajp;
383         this.ep=ep;
384     }
385
386
387     public Object JavaDoc[] getInitData() {
388         return null;
389     }
390     
391     public void runIt(Object JavaDoc perTh[]) {
392         wajp.processConnection(ep);
393     }
394 }
395
Popular Tags