1 16 package org.apache.cocoon.transformation.helpers; 17 18 import java.io.IOException ; 19 import java.net.URL ; 20 21 import org.apache.avalon.framework.activity.Disposable; 22 import org.apache.avalon.framework.component.Component; 23 import org.apache.avalon.framework.logger.AbstractLogEnabled; 24 import org.apache.avalon.framework.parameters.ParameterException; 25 import org.apache.avalon.framework.parameters.Parameterizable; 26 import org.apache.avalon.framework.parameters.Parameters; 27 import org.apache.avalon.framework.service.ServiceException; 28 import org.apache.avalon.framework.service.ServiceManager; 29 import org.apache.avalon.framework.service.Serviceable; 30 import org.apache.avalon.framework.thread.ThreadSafe; 31 import org.apache.cocoon.ProcessingException; 32 import org.apache.cocoon.caching.CachedResponse; 33 import org.apache.cocoon.components.sax.XMLDeserializer; 34 import org.apache.cocoon.components.sax.XMLSerializer; 35 import org.apache.cocoon.components.sax.XMLTeePipe; 36 import org.apache.cocoon.components.source.SourceUtil; 37 import org.apache.cocoon.components.thread.RunnableManager; 38 import org.apache.cocoon.environment.CocoonRunnable; 39 import org.apache.cocoon.xml.XMLConsumer; 40 import org.apache.excalibur.source.Source; 41 import org.apache.excalibur.source.SourceException; 42 import org.apache.excalibur.source.SourceResolver; 43 import org.apache.excalibur.source.SourceValidity; 44 import org.apache.excalibur.store.Store; 45 import org.xml.sax.SAXException ; 46 import EDU.oswego.cs.dl.util.concurrent.CountDown; 47 48 64 public final class DefaultIncludeCacheManager 65 extends AbstractLogEnabled 66 implements IncludeCacheManager, 67 ThreadSafe, 68 Serviceable, 69 Disposable, 70 Parameterizable, 71 Component { 72 73 private ServiceManager manager; 74 75 private SourceResolver resolver; 76 77 private Store store; 78 79 private IncludeCacheStorageProxy defaultCacheStorage; 80 81 private String preemptiveLoaderURI; 82 83 86 public IncludeCacheManagerSession getSession(Parameters pars) { 87 String sourceURI = pars.getParameter("source", null); 88 IncludeCacheManagerSession session; 89 if ( null == sourceURI ) { 90 session = new IncludeCacheManagerSession(pars, this.defaultCacheStorage); 91 } else { 92 Source source = null; 93 try { 94 source = this.resolver.resolveURI(sourceURI); 95 IncludeCacheStorageProxy proxy = new ModifiableSourceIncludeCacheStorageProxy(this.resolver, source.getURI(), this.getLogger()); 96 session = new IncludeCacheManagerSession(pars, proxy); 97 } catch (Exception local) { 98 session = new IncludeCacheManagerSession(pars, this.defaultCacheStorage); 99 this.getLogger().warn("Error creating writeable source.", local); 100 } finally { 101 this.resolver.release(source); 102 } 103 } 104 if (session.isPreemptive()) { 105 if ( null == this.preemptiveLoaderURI ) { 106 this.getLogger().error("Preemptive loading is turned off because the preemptive-loader-url is not configured."); 107 session.setPreemptive(false); 108 } else { 109 if ( !PreemptiveLoader.getInstance().alive ) { 110 111 if (this.getLogger().isDebugEnabled()) { 112 this.getLogger().debug("Booting preemptive loader: " + this.preemptiveLoaderURI); 113 } 114 PreemptiveBooter thread = new PreemptiveBooter( this.preemptiveLoaderURI ); 115 try 116 { 117 final RunnableManager runnableManager = (RunnableManager)this.manager.lookup( RunnableManager.ROLE ); 118 runnableManager.execute( thread ); 119 this.manager.release( runnableManager ); 120 } 121 catch( final ServiceException se ) 122 { 123 getLogger().error( "Cannot lookup RunnableManager", se ); 124 } 125 } 126 } 127 } 128 if (this.getLogger().isDebugEnabled()) { 129 this.getLogger().debug("Creating cache manager session: " + session); 130 } 131 return session; 132 } 133 134 137 public String load(String uri, 138 IncludeCacheManagerSession session) 139 throws IOException , SourceException { 140 if (this.getLogger().isDebugEnabled()) { 141 this.getLogger().debug("Load " + uri + " for session " + session); 142 } 143 144 if ( uri.indexOf("://") == -1) { 146 final Source source = session.resolveURI(uri, this.resolver); 147 uri = source.getURI(); 148 } 149 150 154 if ( session.isParallel() && !session.isPreemptive()) { 156 157 IncludeCacheStorageProxy storage = session.getCacheStorageProxy(); 159 CachedResponse response = (CachedResponse)storage.get(uri); 160 if ( null != response) { 161 SourceValidity[] validities = response.getValidityObjects(); 162 163 if ( !session.isPurging() 165 && validities[0].isValid() == SourceValidity.VALID) { 166 if (this.getLogger().isDebugEnabled()) { 167 this.getLogger().debug("Using cached response for parallel processing."); 168 } 169 session.add(uri, response.getResponse()); 170 return uri; 171 } else { 172 storage.remove(uri); 174 } 175 } 176 177 if (this.getLogger().isDebugEnabled()) { 178 this.getLogger().debug("Starting parallel thread for loading " + uri); 179 } 180 try { 183 XMLSerializer serializer = (XMLSerializer)this.manager.lookup(XMLSerializer.ROLE); 184 Source source = session.resolveURI(uri, this.resolver); 185 186 LoaderThread loader = new LoaderThread(source, serializer, this.manager); 187 Thread thread = new Thread (new CocoonRunnable(loader)); 189 session.add(uri, loader); 190 thread.start(); 191 if (this.getLogger().isDebugEnabled()) { 192 this.getLogger().debug("Thread started for " + uri); 193 } 194 } catch (ServiceException ce) { 195 throw new SourceException("Unable to lookup thread pool or xml serializer.", ce); 196 } catch (Exception e) { 197 throw new SourceException("Unable to get pooled thread.", e); 198 } 199 } 200 return uri; 201 } 202 203 206 public void stream(String uri, 207 IncludeCacheManagerSession session, 208 XMLConsumer handler) 209 throws IOException , SourceException, SAXException { 210 211 if (this.getLogger().isDebugEnabled()) { 212 this.getLogger().debug("Stream " + uri + " for session " + session); 213 } 214 215 if ( session.isParallel() && !session.isPreemptive()) { 217 218 Object object = session.get(uri); 220 221 if ( null == object ) { 222 throw new SAXException ("No pooled thread found for " + uri); 224 } 225 byte[] result; 226 227 if (object instanceof LoaderThread) { 229 LoaderThread loader = (LoaderThread)object; 230 231 if (this.getLogger().isDebugEnabled()) { 232 this.getLogger().debug("Waiting for pooled thread to finish loading."); 233 } 234 235 loader.join(); 237 238 if (this.getLogger().isDebugEnabled()) { 239 this.getLogger().debug("Pooled thread finished loading."); 240 } 241 242 if ( null != loader.exception) { 244 if ( loader.exception instanceof SAXException ) { 245 throw (SAXException )loader.exception; 246 } else if (loader.exception instanceof SourceException ) { 247 throw (SourceException)loader.exception; 248 } else if (loader.exception instanceof IOException ) { 249 throw (IOException )loader.exception; 250 } else { 251 throw new SAXException ("Exception.", loader.exception); 252 } 253 } 254 255 if (this.getLogger().isDebugEnabled()) { 256 this.getLogger().debug("Streaming from pooled thread."); 257 } 258 result = loader.content; 259 260 if (session.getExpires() > 0) { 262 SourceValidity[] validities = new SourceValidity[1]; 263 validities[0] = session.getExpiresValidity(); 264 CachedResponse response = new CachedResponse(validities, result); 265 session.getCacheStorageProxy().put(uri, response); 266 } 267 } else { 268 if (this.getLogger().isDebugEnabled()) { 269 this.getLogger().debug("Streaming from cached response."); 270 } 271 272 result = (byte[])object; 274 } 275 276 XMLDeserializer deserializer = null; 278 try { 279 deserializer = (XMLDeserializer)this.manager.lookup( XMLDeserializer.ROLE ); 280 deserializer.setConsumer(handler); 281 deserializer.deserialize(result); 282 } catch (ServiceException ce) { 283 throw new SAXException ("Unable to lookup xml deserializer.", ce); 284 } finally { 285 this.manager.release( deserializer ); 286 } 287 return; 288 289 } else { 290 292 IncludeCacheStorageProxy storage = session.getCacheStorageProxy(); 294 CachedResponse response = (CachedResponse)storage.get(uri); 295 if ( null != response) { 296 SourceValidity[] validities = response.getValidityObjects(); 297 if ( !session.isPurging() 300 && (session.isPreemptive() || validities[0].isValid() == SourceValidity.VALID)) { 301 302 if (this.getLogger().isDebugEnabled()) { 304 this.getLogger().debug("Streaming from cached response."); 305 } 306 XMLDeserializer deserializer = null; 307 try { 308 deserializer = (XMLDeserializer)this.manager.lookup( XMLDeserializer.ROLE ); 309 deserializer.setConsumer(handler); 310 deserializer.deserialize(response.getResponse()); 311 } catch (ServiceException ce) { 312 throw new SAXException ("Unable to lookup xml deserializer.", ce); 313 } finally { 314 this.manager.release( deserializer ); 315 } 316 317 if ( session.getExpires() > 0 319 && session.isPreemptive() 320 && validities[0].isValid() != SourceValidity.VALID) { 321 if (this.getLogger().isDebugEnabled()) { 322 this.getLogger().debug("Add uri to preemptive loader list " + uri); 323 } 324 if (!PreemptiveLoader.getInstance().alive) { 325 this.getLogger().error("Preemptive loader has not started yet."); 326 } 327 PreemptiveLoader.getInstance().add(session.getCacheStorageProxy(), uri, session.getExpires()); 328 } 329 return; 330 331 } else { 332 storage.remove(uri); 334 } 335 } 336 } 337 338 XMLSerializer serializer = null; 340 try { 341 final Source source = session.resolveURI(uri, this.resolver); 342 343 if (this.getLogger().isDebugEnabled()) { 345 this.getLogger().debug("Streaming directly from source."); 346 } 347 if (session.getExpires() > 0) { 348 serializer = (XMLSerializer)this.manager.lookup(XMLSerializer.ROLE); 349 XMLTeePipe tee = new XMLTeePipe(handler, serializer); 350 351 SourceUtil.toSAX(source, tee); 352 353 SourceValidity[] validities = new SourceValidity[1]; 354 validities[0] = session.getExpiresValidity(); 355 CachedResponse response = new CachedResponse(validities, 356 (byte[])serializer.getSAXFragment()); 357 session.getCacheStorageProxy().put(uri, response); 358 } else { 359 SourceUtil.toSAX(source, handler); 360 } 361 362 } catch (ProcessingException pe) { 363 throw new SAXException ("ProcessingException", pe); 364 } catch (ServiceException e) { 365 throw new SAXException ("Unable to lookup xml serializer.", e); 366 } finally { 367 this.manager.release(serializer); 368 } 369 } 370 371 374 public void terminateSession(IncludeCacheManagerSession session) { 375 if (this.getLogger().isDebugEnabled()) { 376 this.getLogger().debug("Terminating cache manager session " + session); 377 } 378 session.cleanup(this.resolver); 379 } 380 381 384 public void service(ServiceManager manager) throws ServiceException { 385 this.manager = manager; 386 this.resolver = (SourceResolver)this.manager.lookup(SourceResolver.ROLE); 387 } 388 389 392 public void dispose() { 393 PreemptiveLoader.getInstance().stop(); 395 if ( null != this.manager ) { 396 this.manager.release( this.resolver); 397 this.manager.release(this.store); 398 this.store = null; 399 this.resolver = null; 400 this.manager = null; 401 this.defaultCacheStorage = null; 402 } 403 } 404 405 408 public void parameterize(Parameters parameters) throws ParameterException { 409 this.preemptiveLoaderURI = parameters.getParameter("preemptive-loader-url", null); 410 if ( null != this.preemptiveLoaderURI 411 && this.preemptiveLoaderURI.indexOf("://") == -1) { 412 throw new ParameterException("The preemptive-loader-url must be absolute: " + this.preemptiveLoaderURI); 413 } 414 final String storeRole = parameters.getParameter("use-store", Store.ROLE); 415 try { 416 this.store = (Store)this.manager.lookup(storeRole); 417 } catch (ServiceException e) { 418 throw new ParameterException("Unable to lookup store with role " + storeRole, e); 419 } 420 this.defaultCacheStorage = new StoreIncludeCacheStorageProxy(this.store, this.getLogger()); 421 } 422 423 final private static class LoaderThread implements Runnable { 424 425 final private Source source; 426 final private XMLSerializer serializer; 427 final private CountDown finished; 428 Exception exception; 429 byte[] content; 430 final private ServiceManager manager; 431 432 public LoaderThread(Source source, 433 XMLSerializer serializer, 434 ServiceManager manager) { 435 this.source = source; 436 this.serializer = serializer; 437 this.finished = new CountDown( 1 ); 438 this.manager = manager; 439 } 440 441 public void run() { 442 try { 443 SourceUtil.toSAX(this.source, this.serializer); 444 this.content = (byte[])this.serializer.getSAXFragment(); 445 } catch (Exception local) { 446 this.exception = local; 447 } finally { 448 this.manager.release( this.serializer ); 449 this.finished.release(); 450 } 451 } 452 453 void join() { 454 try { 455 this.finished.acquire(); 456 } catch ( final InterruptedException ie) { 457 } 459 } 460 } 461 462 final static class PreemptiveBooter implements Runnable { 463 464 private final String uri; 465 466 public PreemptiveBooter( final String uri ) { 467 this.uri = uri; 468 } 469 470 public void run() { 471 try { 472 URL url = new URL (this.uri); 473 url.getContent(); 474 } catch (Exception ignore) { 475 } 476 } 477 } 478 } 479 | Popular Tags |