1 17 18 package org.apache.james.transport; 19 20 import org.apache.avalon.cornerstone.services.threads.ThreadManager; 21 import org.apache.avalon.framework.activity.Disposable; 23 import org.apache.avalon.framework.activity.Initializable; 24 import org.apache.avalon.framework.component.ComponentException; 25 import org.apache.avalon.framework.component.ComponentManager; 26 import org.apache.avalon.framework.component.Composable; 27 import org.apache.avalon.framework.component.DefaultComponentManager; 28 import org.apache.avalon.framework.component.Component; 29 import org.apache.avalon.framework.configuration.Configurable; 30 import org.apache.avalon.framework.configuration.Configuration; 31 import org.apache.avalon.framework.configuration.ConfigurationException; 32 import org.apache.avalon.framework.logger.AbstractLogEnabled; 33 import org.apache.avalon.framework.context.Context; 34 import org.apache.avalon.framework.context.Contextualizable; 35 import org.apache.james.core.MailImpl; 36 import org.apache.james.services.MailStore; 37 import org.apache.james.services.SpoolRepository; 38 import org.apache.mailet.*; 39 40 import javax.mail.MessagingException ; 41 42 import java.util.Collection ; 43 import java.util.HashMap ; 44 import java.util.Iterator ; 45 46 54 public class JamesSpoolManager 55 extends AbstractLogEnabled 56 implements Composable, Configurable, Initializable, 57 Runnable , Disposable, Component, Contextualizable { 58 59 private Context context; 60 63 private final static boolean DEEP_DEBUG = false; 64 65 68 private DefaultComponentManager compMgr; 69 70 73 private Configuration conf; 74 75 private SpoolRepository spool; 76 77 private MailetContext mailetContext; 78 79 82 private HashMap processors; 83 84 87 private int numThreads; 88 89 99 101 104 106 109 private int numActive; 110 111 114 private boolean active; 115 116 119 private Collection spoolThreads; 120 121 124 public void compose(ComponentManager comp) 125 throws ComponentException { 126 compMgr = new DefaultComponentManager(comp); 128 } 129 130 133 public void configure(Configuration conf) throws ConfigurationException { 134 this.conf = conf; 135 numThreads = conf.getChild("threads").getValueAsInteger(1); 136 } 137 138 141 public void initialize() throws Exception { 142 143 getLogger().info("JamesSpoolManager init..."); 144 MailStore mailstore 146 = (MailStore) compMgr.lookup("org.apache.james.services.MailStore"); 147 spool = mailstore.getInboundSpool(); 148 if (null == spool) 149 { 150 String exceptionMessage = "The mailstore's inbound spool is null. The mailstore is misconfigured"; 151 if (getLogger().isErrorEnabled()) { 152 getLogger().error( exceptionMessage ); 153 } 154 throw new ConfigurationException(exceptionMessage); 155 } 156 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 157 getLogger().debug("Got spool"); 158 } 159 160 mailetContext 161 = (MailetContext) compMgr.lookup("org.apache.mailet.MailetContext"); 162 MailetLoader mailetLoader = new MailetLoader(); 163 MatchLoader matchLoader = new MatchLoader(); 164 try { 165 mailetLoader.setLogger(getLogger()); 166 matchLoader.setLogger(getLogger()); 167 mailetLoader.contextualize(context); 168 matchLoader.contextualize(context); 169 mailetLoader.configure(conf.getChild("mailetpackages")); 170 matchLoader.configure(conf.getChild("matcherpackages")); 171 compMgr.put(Resources.MAILET_LOADER, mailetLoader); 172 compMgr.put(Resources.MATCH_LOADER, matchLoader); 173 } catch (ConfigurationException ce) { 174 final String message = 175 "Unable to configure mailet/matcher Loaders: " 176 + ce.getMessage(); 177 178 if (getLogger().isErrorEnabled()) { 179 getLogger().error( message, ce ); 180 } 181 throw new RuntimeException ( message ); 182 } 183 184 processors = new HashMap (); 186 187 final Configuration[] processorConfs = conf.getChildren( "processor" ); 188 for ( int i = 0; i < processorConfs.length; i++ ) 189 { 190 Configuration processorConf = processorConfs[i]; 191 String processorName = processorConf.getAttribute("name"); 192 try { 193 LinearProcessor processor = new LinearProcessor(); 194 setupLogger(processor, processorName); 195 processor.setSpool(spool); 196 processor.initialize(); 197 processors.put(processorName, processor); 198 199 if (processorName.equals("root")) { 202 Matcher matcher = matchLoader.getMatcher("All", 203 mailetContext); 204 Mailet mailet = mailetLoader.getMailet("PostmasterAlias", 205 mailetContext, null); 206 processor.add(matcher, mailet); 207 } 208 209 final Configuration[] mailetConfs 210 = processorConf.getChildren( "mailet" ); 211 for ( int j = 0; j < mailetConfs.length; j++ ) 215 { 216 Configuration c = mailetConfs[j]; 217 String mailetClassName = c.getAttribute("class"); 218 String matcherName = c.getAttribute("match"); 219 Mailet mailet = null; 220 Matcher matcher = null; 221 try { 222 matcher = matchLoader.getMatcher(matcherName, 223 mailetContext); 224 if (getLogger().isInfoEnabled()) { 226 StringBuffer infoBuffer = 227 new StringBuffer (64) 228 .append("Matcher ") 229 .append(matcherName) 230 .append(" instantiated."); 231 getLogger().info(infoBuffer.toString()); 232 } 233 } catch (MessagingException ex) { 234 if (getLogger().isErrorEnabled()) { 236 StringBuffer errorBuffer = 237 new StringBuffer (256) 238 .append("Unable to init matcher ") 239 .append(matcherName) 240 .append(": ") 241 .append(ex.toString()); 242 getLogger().error( errorBuffer.toString(), ex ); 243 if (ex.getNextException() != null) { 244 getLogger().error( "Caused by nested exception: ", ex.getNextException()); 245 } 246 } 247 System.err.println("Unable to init matcher " + matcherName); 248 System.err.println("Check spool manager logs for more details."); 249 throw ex; 251 } 252 try { 253 mailet = mailetLoader.getMailet(mailetClassName, 254 mailetContext, c); 255 if (getLogger().isInfoEnabled()) { 256 StringBuffer infoBuffer = 257 new StringBuffer (64) 258 .append("Mailet ") 259 .append(mailetClassName) 260 .append(" instantiated."); 261 getLogger().info(infoBuffer.toString()); 262 } 263 } catch (MessagingException ex) { 264 if (getLogger().isErrorEnabled()) { 266 StringBuffer errorBuffer = 267 new StringBuffer (256) 268 .append("Unable to init mailet ") 269 .append(mailetClassName) 270 .append(": ") 271 .append(ex.toString()); 272 getLogger().error( errorBuffer.toString(), ex ); 273 if (ex.getNextException() != null) { 274 getLogger().error( "Caused by nested exception: ", ex.getNextException()); 275 } 276 } 277 System.err.println("Unable to init mailet " + mailetClassName); 278 System.err.println("Check spool manager logs for more details."); 279 throw ex; 281 } 282 processor.add(matcher, mailet); 284 } 285 286 processor.closeProcessorLists(); 292 293 if (getLogger().isInfoEnabled()) { 294 StringBuffer infoBuffer = 295 new StringBuffer (64) 296 .append("Processor ") 297 .append(processorName) 298 .append(" instantiated."); 299 getLogger().info(infoBuffer.toString()); 300 } 301 } catch (Exception ex) { 302 if (getLogger().isErrorEnabled()) { 303 StringBuffer errorBuffer = 304 new StringBuffer (256) 305 .append("Unable to init processor ") 306 .append(processorName) 307 .append(": ") 308 .append(ex.toString()); 309 getLogger().error( errorBuffer.toString(), ex ); 310 } 311 throw ex; 312 } 313 } 314 if (getLogger().isInfoEnabled()) { 315 StringBuffer infoBuffer = 316 new StringBuffer (64) 317 .append("Spooler Manager uses ") 318 .append(numThreads) 319 .append(" Thread(s)"); 320 getLogger().info(infoBuffer.toString()); 321 } 322 323 active = true; 324 numActive = 0; 325 spoolThreads = new java.util.ArrayList (numThreads); 326 for ( int i = 0 ; i < numThreads ; i++ ) { 327 Thread reader = new Thread (this, "Spool Thread #" + i); 328 spoolThreads.add(reader); 329 reader.start(); 330 } 331 } 332 333 337 public void run() { 338 339 if (getLogger().isInfoEnabled()) 340 { 341 getLogger().info("Run JamesSpoolManager: " 342 + Thread.currentThread().getName()); 343 getLogger().info("Spool=" + spool.getClass().getName()); 344 } 345 346 numActive++; 347 while(active) { 348 String key = null; 349 try { 350 MailImpl mail = (MailImpl)spool.accept(); 351 key = mail.getName(); 352 if (getLogger().isDebugEnabled()) { 353 StringBuffer debugBuffer = 354 new StringBuffer (64) 355 .append("==== Begin processing mail ") 356 .append(mail.getName()) 357 .append("===="); 358 getLogger().debug(debugBuffer.toString()); 359 } 360 process(mail); 361 if ((Mail.GHOST.equals(mail.getState())) || 364 (mail.getRecipients() == null) || 365 (mail.getRecipients().size() == 0)) { 366 spool.remove(key); 367 if (getLogger().isDebugEnabled()) { 368 StringBuffer debugBuffer = 369 new StringBuffer (64) 370 .append("==== Removed from spool mail ") 371 .append(mail.getName()) 372 .append("===="); 373 getLogger().debug(debugBuffer.toString()); 374 } 375 } 376 else { 377 spool.unlock(key); 381 } 382 mail = null; 383 } catch (InterruptedException ie) { 384 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName()); 385 } catch (Throwable e) { 386 if (getLogger().isErrorEnabled()) { 387 getLogger().error("Exception processing " + key + " in JamesSpoolManager.run " 388 + e.getMessage(), e); 389 } 390 404 } 405 } 406 if (getLogger().isInfoEnabled()) 407 { 408 getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName()); 409 } 410 numActive--; 411 } 412 413 419 protected void process(MailImpl mail) { 420 while (true) { 421 String processorName = mail.getState(); 422 if (processorName.equals(Mail.GHOST)) { 423 return; 425 } 426 try { 427 LinearProcessor processor 428 = (LinearProcessor)processors.get(processorName); 429 if (processor == null) { 430 StringBuffer exceptionMessageBuffer = 431 new StringBuffer (128) 432 .append("Unable to find processor ") 433 .append(processorName) 434 .append(" requested for processing of ") 435 .append(mail.getName()); 436 String exceptionMessage = exceptionMessageBuffer.toString(); 437 getLogger().debug(exceptionMessage); 438 mail.setState(Mail.ERROR); 439 throw new MailetException(exceptionMessage); 440 } 441 StringBuffer logMessageBuffer = null; 442 if (getLogger().isDebugEnabled()) { 443 logMessageBuffer = 444 new StringBuffer (64) 445 .append("Processing ") 446 .append(mail.getName()) 447 .append(" through ") 448 .append(processorName); 449 getLogger().debug(logMessageBuffer.toString()); 450 } 451 processor.service(mail); 452 if (getLogger().isDebugEnabled()) { 453 logMessageBuffer = 454 new StringBuffer (128) 455 .append("Processed ") 456 .append(mail.getName()) 457 .append(" through ") 458 .append(processorName); 459 getLogger().debug(logMessageBuffer.toString()); 460 getLogger().debug("Result was " + mail.getState()); 461 } 462 return; 463 } catch (Throwable e) { 464 StringBuffer exceptionBuffer = 467 new StringBuffer (64) 468 .append("Exception in processor <") 469 .append(processorName) 470 .append(">"); 471 getLogger().error(exceptionBuffer.toString(), e); 472 if (processorName.equals(Mail.ERROR)) { 473 mail.setState(Mail.GHOST); 476 mail.setErrorMessage(e.getMessage()); 477 } else { 478 if (!(e instanceof MessagingException )) { 480 mail.setState(Mail.ERROR); 482 } 483 mail.setErrorMessage(e.getMessage()); 484 } 485 } 486 if (getLogger().isErrorEnabled()) { 487 StringBuffer logMessageBuffer = 488 new StringBuffer (128) 489 .append("An error occurred processing ") 490 .append(mail.getName()) 491 .append(" through ") 492 .append(processorName); 493 getLogger().error(logMessageBuffer.toString()); 494 getLogger().error("Result was " + mail.getState()); 495 } 496 } 497 } 498 499 509 public void dispose() { 510 getLogger().info("JamesSpoolManager dispose..."); 511 active = false; for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) { 513 ((Thread ) it.next()).interrupt(); } 515 516 long stop = System.currentTimeMillis() + 60000; 517 while (numActive != 0 && stop > System.currentTimeMillis()) { 519 try { 520 Thread.sleep(1000); 521 } catch (Exception ignored) {} 522 } 523 getLogger().info("JamesSpoolManager thread shutdown completed."); 524 525 Iterator it = processors.keySet().iterator(); 526 while (it.hasNext()) { 527 String processorName = (String )it.next(); 528 if (getLogger().isDebugEnabled()) { 529 getLogger().debug("Processor " + processorName); 530 } 531 LinearProcessor processor = (LinearProcessor)processors.get(processorName); 532 processor.dispose(); 533 processors.remove(processor); 534 } 535 } 536 537 540 public void contextualize(Context context) { 541 this.context = context; 542 } 543 } 544 | Popular Tags |