KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > cocoon > transformation > helpers > DefaultIncludeCacheManager


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.apache.cocoon.transformation.helpers;
17
18 import java.io.IOException JavaDoc;
19 import java.net.URL JavaDoc;
20
21 import org.apache.avalon.framework.activity.Disposable;
22 import org.apache.avalon.framework.component.Component;
23 import org.apache.avalon.framework.logger.AbstractLogEnabled;
24 import org.apache.avalon.framework.parameters.ParameterException;
25 import org.apache.avalon.framework.parameters.Parameterizable;
26 import org.apache.avalon.framework.parameters.Parameters;
27 import org.apache.avalon.framework.service.ServiceException;
28 import org.apache.avalon.framework.service.ServiceManager;
29 import org.apache.avalon.framework.service.Serviceable;
30 import org.apache.avalon.framework.thread.ThreadSafe;
31 import org.apache.cocoon.ProcessingException;
32 import org.apache.cocoon.caching.CachedResponse;
33 import org.apache.cocoon.components.sax.XMLDeserializer;
34 import org.apache.cocoon.components.sax.XMLSerializer;
35 import org.apache.cocoon.components.sax.XMLTeePipe;
36 import org.apache.cocoon.components.source.SourceUtil;
37 import org.apache.cocoon.components.thread.RunnableManager;
38 import org.apache.cocoon.environment.CocoonRunnable;
39 import org.apache.cocoon.xml.XMLConsumer;
40 import org.apache.excalibur.source.Source;
41 import org.apache.excalibur.source.SourceException;
42 import org.apache.excalibur.source.SourceResolver;
43 import org.apache.excalibur.source.SourceValidity;
44 import org.apache.excalibur.store.Store;
45 import org.xml.sax.SAXException JavaDoc;
46 import EDU.oswego.cs.dl.util.concurrent.CountDown;
47
48 /**
49  * Default implementation of a {@link IncludeCacheManager}.
50  *
51  * This implementation requires a configuration, if preemptive
52  * loading is used:
53  * <parameter name="preemptive-loader-url" value="some url"/>
54  *
55  * This is a url inside cocoon, that contains the preemptive loader
56  * url; it must be specified absolute (with http://...)
57  * If this loader cannot be started, only an error is logged into the
58  * log, so actually cached content is never updated!
59  *
60  * @author <a HREF="mailto:cziegeler@apache.org">Carsten Ziegeler</a>
61  * @version CVS $Id: DefaultIncludeCacheManager.java 189932 2005-06-10 09:33:14Z sylvain $
62  * @since 2.1
63  */

64 public final class DefaultIncludeCacheManager
65     extends AbstractLogEnabled
66     implements IncludeCacheManager,
67                 ThreadSafe,
68                 Serviceable,
69                 Disposable,
70                 Parameterizable,
71                 Component {
72
73     private ServiceManager manager;
74     
75     private SourceResolver resolver;
76     
77     private Store store;
78     
79     private IncludeCacheStorageProxy defaultCacheStorage;
80     
81     private String JavaDoc preemptiveLoaderURI;
82     
83     /**
84      * @see IncludeCacheManager#getSession(org.apache.avalon.framework.parameters.Parameters)
85      */

86     public IncludeCacheManagerSession getSession(Parameters pars) {
87         String JavaDoc sourceURI = pars.getParameter("source", null);
88         IncludeCacheManagerSession session;
89         if ( null == sourceURI ) {
90             session = new IncludeCacheManagerSession(pars, this.defaultCacheStorage);
91         } else {
92             Source source = null;
93             try {
94                 source = this.resolver.resolveURI(sourceURI);
95                 IncludeCacheStorageProxy proxy = new ModifiableSourceIncludeCacheStorageProxy(this.resolver, source.getURI(), this.getLogger());
96                 session = new IncludeCacheManagerSession(pars, proxy);
97             } catch (Exception JavaDoc local) {
98                 session = new IncludeCacheManagerSession(pars, this.defaultCacheStorage);
99                 this.getLogger().warn("Error creating writeable source.", local);
100             } finally {
101                 this.resolver.release(source);
102             }
103         }
104         if (session.isPreemptive()) {
105             if ( null == this.preemptiveLoaderURI ) {
106                 this.getLogger().error("Preemptive loading is turned off because the preemptive-loader-url is not configured.");
107                 session.setPreemptive(false);
108             } else {
109                 if ( !PreemptiveLoader.getInstance().alive ) {
110
111                     if (this.getLogger().isDebugEnabled()) {
112                         this.getLogger().debug("Booting preemptive loader: " + this.preemptiveLoaderURI);
113                     }
114                     PreemptiveBooter thread = new PreemptiveBooter( this.preemptiveLoaderURI );
115                     try
116                     {
117                         final RunnableManager runnableManager = (RunnableManager)this.manager.lookup( RunnableManager.ROLE );
118                         runnableManager.execute( thread );
119                         this.manager.release( runnableManager );
120                     }
121                     catch( final ServiceException se )
122                     {
123                         getLogger().error( "Cannot lookup RunnableManager", se );
124                     }
125                 }
126             }
127         }
128         if (this.getLogger().isDebugEnabled()) {
129             this.getLogger().debug("Creating cache manager session: " + session);
130         }
131         return session;
132     }
133
134     /**
135      * @see IncludeCacheManager#load(java.lang.String, IncludeCacheManagerSession)
136      */

137     public String JavaDoc load(String JavaDoc uri,
138                         IncludeCacheManagerSession session)
139     throws IOException JavaDoc, SourceException {
140         if (this.getLogger().isDebugEnabled()) {
141             this.getLogger().debug("Load " + uri + " for session " + session);
142         }
143         
144         // first make the URI absolute
145
if ( uri.indexOf("://") == -1) {
146             final Source source = session.resolveURI(uri, this.resolver);
147             uri = source.getURI();
148         }
149         
150         // if we are not processing in parallel (or do preemptive)
151
// then we don't have to do anything in this method - everything
152
// is done in the stream method.
153

154         // if we are processing in parallel (and not preemptive) then....
155
if ( session.isParallel() && !session.isPreemptive()) {
156             
157             // first look-up if we have a valid stored response
158
IncludeCacheStorageProxy storage = session.getCacheStorageProxy();
159             CachedResponse response = (CachedResponse)storage.get(uri);
160             if ( null != response) {
161                 SourceValidity[] validities = response.getValidityObjects();
162                 
163                 // if we are valid and do not purging
164
if ( !session.isPurging()
165                       && validities[0].isValid() == SourceValidity.VALID) {
166                     if (this.getLogger().isDebugEnabled()) {
167                         this.getLogger().debug("Using cached response for parallel processing.");
168                     }
169                     session.add(uri, response.getResponse());
170                     return uri;
171                 } else {
172                     // response is not used
173
storage.remove(uri);
174                 }
175             }
176
177             if (this.getLogger().isDebugEnabled()) {
178                 this.getLogger().debug("Starting parallel thread for loading " + uri);
179             }
180             // now we start a parallel thread, this thread gets all required avalon components
181
// so it does not have to lookup them by itself
182
try {
183                 XMLSerializer serializer = (XMLSerializer)this.manager.lookup(XMLSerializer.ROLE);
184                 Source source = session.resolveURI(uri, this.resolver);
185
186                 LoaderThread loader = new LoaderThread(source, serializer, this.manager);
187                 // Load the included content in a thread that inherits the current thread's environment
188
Thread JavaDoc thread = new Thread JavaDoc(new CocoonRunnable(loader));
189                 session.add(uri, loader);
190                 thread.start();
191                 if (this.getLogger().isDebugEnabled()) {
192                     this.getLogger().debug("Thread started for " + uri);
193                 }
194             } catch (ServiceException ce) {
195                 throw new SourceException("Unable to lookup thread pool or xml serializer.", ce);
196             } catch (Exception JavaDoc e) {
197                 throw new SourceException("Unable to get pooled thread.", e);
198             }
199         }
200         return uri;
201     }
202
203     /**
204      * @see IncludeCacheManager#stream(java.lang.String, IncludeCacheManagerSession, XMLConsumer)
205      */

206     public void stream(String JavaDoc uri,
207                         IncludeCacheManagerSession session,
208                         XMLConsumer handler)
209     throws IOException JavaDoc, SourceException, SAXException JavaDoc {
210
211         if (this.getLogger().isDebugEnabled()) {
212             this.getLogger().debug("Stream " + uri + " for session " + session);
213         }
214
215         // if we are processing in parallel (and not preemptive) then....
216
if ( session.isParallel() && !session.isPreemptive()) {
217             
218             // get either the cached content or the pooled thread
219
Object JavaDoc object = session.get(uri);
220             
221             if ( null == object ) {
222                 // this should never happen!
223
throw new SAXException JavaDoc("No pooled thread found for " + uri);
224             }
225             byte[] result;
226             
227             // is this a pooled thread?
228
if (object instanceof LoaderThread) {
229                 LoaderThread loader = (LoaderThread)object;
230                 
231                 if (this.getLogger().isDebugEnabled()) {
232                     this.getLogger().debug("Waiting for pooled thread to finish loading.");
233                 }
234
235                 // wait for it
236
loader.join();
237
238                 if (this.getLogger().isDebugEnabled()) {
239                     this.getLogger().debug("Pooled thread finished loading.");
240                 }
241                 
242                 // did an exception occur? Then reraise it
243
if ( null != loader.exception) {
244                     if ( loader.exception instanceof SAXException JavaDoc ) {
245                         throw (SAXException JavaDoc)loader.exception;
246                     } else if (loader.exception instanceof SourceException ) {
247                         throw (SourceException)loader.exception;
248                     } else if (loader.exception instanceof IOException JavaDoc) {
249                         throw (IOException JavaDoc)loader.exception;
250                     } else {
251                         throw new SAXException JavaDoc("Exception.", loader.exception);
252                     }
253                 }
254                 
255                 if (this.getLogger().isDebugEnabled()) {
256                     this.getLogger().debug("Streaming from pooled thread.");
257                 }
258                 result = loader.content;
259
260                 // cache the response (remember preemptive is off)
261
if (session.getExpires() > 0) {
262                     SourceValidity[] validities = new SourceValidity[1];
263                     validities[0] = session.getExpiresValidity();
264                     CachedResponse response = new CachedResponse(validities, result);
265                     session.getCacheStorageProxy().put(uri, response);
266                 }
267             } else {
268                 if (this.getLogger().isDebugEnabled()) {
269                     this.getLogger().debug("Streaming from cached response.");
270                 }
271
272                 // use the response from the cache
273
result = (byte[])object;
274             }
275             
276             // stream the content
277
XMLDeserializer deserializer = null;
278             try {
279                 deserializer = (XMLDeserializer)this.manager.lookup( XMLDeserializer.ROLE );
280                 deserializer.setConsumer(handler);
281                 deserializer.deserialize(result);
282             } catch (ServiceException ce) {
283                 throw new SAXException JavaDoc("Unable to lookup xml deserializer.", ce);
284             } finally {
285                 this.manager.release( deserializer );
286             }
287             return;
288             
289         } else {
290             // we are not processing parallel
291

292             // first: test for a cached response
293
IncludeCacheStorageProxy storage = session.getCacheStorageProxy();
294             CachedResponse response = (CachedResponse)storage.get(uri);
295             if ( null != response) {
296                 SourceValidity[] validities = response.getValidityObjects();
297                 // if purging is turned off and either the cached response is valid or
298
// we are loading preemptive, then use the cached response
299
if ( !session.isPurging()
300                       && (session.isPreemptive() || validities[0].isValid() == SourceValidity.VALID)) {
301
302                     // stream the content
303
if (this.getLogger().isDebugEnabled()) {
304                         this.getLogger().debug("Streaming from cached response.");
305                     }
306                     XMLDeserializer deserializer = null;
307                     try {
308                         deserializer = (XMLDeserializer)this.manager.lookup( XMLDeserializer.ROLE );
309                         deserializer.setConsumer(handler);
310                         deserializer.deserialize(response.getResponse());
311                     } catch (ServiceException ce) {
312                         throw new SAXException JavaDoc("Unable to lookup xml deserializer.", ce);
313                     } finally {
314                         this.manager.release( deserializer );
315                     }
316                     
317                     // load preemptive if the response is not valid
318
if ( session.getExpires() > 0
319                          && session.isPreemptive()
320                          && validities[0].isValid() != SourceValidity.VALID) {
321                         if (this.getLogger().isDebugEnabled()) {
322                             this.getLogger().debug("Add uri to preemptive loader list " + uri);
323                         }
324                         if (!PreemptiveLoader.getInstance().alive) {
325                             this.getLogger().error("Preemptive loader has not started yet.");
326                         }
327                         PreemptiveLoader.getInstance().add(session.getCacheStorageProxy(), uri, session.getExpires());
328                     }
329                     return;
330  
331                 } else {
332                     // cached response is not valid
333
storage.remove(uri);
334                 }
335             }
336         }
337
338         // we are not processing in parallel and have no (valid) cached response
339
XMLSerializer serializer = null;
340         try {
341             final Source source = session.resolveURI(uri, this.resolver);
342             
343             // stream directly (and cache the response)
344
if (this.getLogger().isDebugEnabled()) {
345                 this.getLogger().debug("Streaming directly from source.");
346             }
347             if (session.getExpires() > 0) {
348                 serializer = (XMLSerializer)this.manager.lookup(XMLSerializer.ROLE);
349                 XMLTeePipe tee = new XMLTeePipe(handler, serializer);
350                 
351                 SourceUtil.toSAX(source, tee);
352                 
353                 SourceValidity[] validities = new SourceValidity[1];
354                 validities[0] = session.getExpiresValidity();
355                 CachedResponse response = new CachedResponse(validities,
356                                                              (byte[])serializer.getSAXFragment());
357                 session.getCacheStorageProxy().put(uri, response);
358             } else {
359                 SourceUtil.toSAX(source, handler);
360             }
361             
362         } catch (ProcessingException pe) {
363             throw new SAXException JavaDoc("ProcessingException", pe);
364         } catch (ServiceException e) {
365             throw new SAXException JavaDoc("Unable to lookup xml serializer.", e);
366         } finally {
367             this.manager.release(serializer);
368         }
369     }
370
371     /**
372      * @see IncludeCacheManager#terminateSession(IncludeCacheManagerSession)
373      */

374     public void terminateSession(IncludeCacheManagerSession session) {
375         if (this.getLogger().isDebugEnabled()) {
376             this.getLogger().debug("Terminating cache manager session " + session);
377         }
378         session.cleanup(this.resolver);
379     }
380
381     /**
382      * @see org.apache.avalon.framework.service.Serviceable#service(org.apache.avalon.framework.service.ServiceManager)
383      */

384     public void service(ServiceManager manager) throws ServiceException {
385         this.manager = manager;
386         this.resolver = (SourceResolver)this.manager.lookup(SourceResolver.ROLE);
387     }
388
389     /**
390      * @see org.apache.avalon.framework.activity.Disposable#dispose()
391      */

392     public void dispose() {
393         // stop preemptive loader (if running)
394
PreemptiveLoader.getInstance().stop();
395         if ( null != this.manager ) {
396             this.manager.release( this.resolver);
397             this.manager.release(this.store);
398             this.store = null;
399             this.resolver = null;
400             this.manager = null;
401             this.defaultCacheStorage = null;
402         }
403     }
404
405     /**
406      * @see org.apache.avalon.framework.parameters.Parameterizable#parameterize(org.apache.avalon.framework.parameters.Parameters)
407      */

408     public void parameterize(Parameters parameters) throws ParameterException {
409         this.preemptiveLoaderURI = parameters.getParameter("preemptive-loader-url", null);
410         if ( null != this.preemptiveLoaderURI
411              && this.preemptiveLoaderURI.indexOf("://") == -1) {
412             throw new ParameterException("The preemptive-loader-url must be absolute: " + this.preemptiveLoaderURI);
413         }
414         final String JavaDoc storeRole = parameters.getParameter("use-store", Store.ROLE);
415         try {
416             this.store = (Store)this.manager.lookup(storeRole);
417         } catch (ServiceException e) {
418             throw new ParameterException("Unable to lookup store with role " + storeRole, e);
419         }
420         this.defaultCacheStorage = new StoreIncludeCacheStorageProxy(this.store, this.getLogger());
421     }
422
423     final private static class LoaderThread implements Runnable JavaDoc {
424         
425         final private Source source;
426         final private XMLSerializer serializer;
427         final private CountDown finished;
428         Exception JavaDoc exception;
429         byte[] content;
430         final private ServiceManager manager;
431         
432         public LoaderThread(Source source,
433                             XMLSerializer serializer,
434                             ServiceManager manager) {
435             this.source = source;
436             this.serializer = serializer;
437             this.finished = new CountDown( 1 );
438             this.manager = manager;
439         }
440         
441         public void run() {
442             try {
443                 SourceUtil.toSAX(this.source, this.serializer);
444                 this.content = (byte[])this.serializer.getSAXFragment();
445             } catch (Exception JavaDoc local) {
446                 this.exception = local;
447             } finally {
448                 this.manager.release( this.serializer );
449                 this.finished.release();
450             }
451         }
452         
453         void join() {
454             try {
455                 this.finished.acquire();
456             } catch ( final InterruptedException JavaDoc ie) {
457                 // ignore
458
}
459         }
460     }
461
462     final static class PreemptiveBooter implements Runnable JavaDoc {
463
464         private final String JavaDoc uri;
465         
466         public PreemptiveBooter( final String JavaDoc uri ) {
467             this.uri = uri;
468         }
469         
470         public void run() {
471             try {
472                 URL JavaDoc url = new URL JavaDoc(this.uri);
473                 url.getContent();
474             } catch (Exception JavaDoc ignore) {
475             }
476         }
477     }
478 }
479
Popular Tags