KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > james > transport > JamesSpoolManager


1 /***********************************************************************
2  * Copyright (c) 2000-2004 The Apache Software Foundation. *
3  * All rights reserved. *
4  * ------------------------------------------------------------------- *
5  * Licensed under the Apache License, Version 2.0 (the "License"); you *
6  * may not use this file except in compliance with the License. You *
7  * may obtain a copy of the License at: *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14  * implied. See the License for the specific language governing *
15  * permissions and limitations under the License. *
16  ***********************************************************************/

17
18 package org.apache.james.transport;
19
20 import org.apache.avalon.cornerstone.services.threads.ThreadManager;
21 //import org.apache.avalon.excalibur.thread.ThreadPool;
22
import org.apache.avalon.framework.activity.Disposable;
23 import org.apache.avalon.framework.activity.Initializable;
24 import org.apache.avalon.framework.component.ComponentException;
25 import org.apache.avalon.framework.component.ComponentManager;
26 import org.apache.avalon.framework.component.Composable;
27 import org.apache.avalon.framework.component.DefaultComponentManager;
28 import org.apache.avalon.framework.component.Component;
29 import org.apache.avalon.framework.configuration.Configurable;
30 import org.apache.avalon.framework.configuration.Configuration;
31 import org.apache.avalon.framework.configuration.ConfigurationException;
32 import org.apache.avalon.framework.logger.AbstractLogEnabled;
33 import org.apache.avalon.framework.context.Context;
34 import org.apache.avalon.framework.context.Contextualizable;
35 import org.apache.james.core.MailImpl;
36 import org.apache.james.services.MailStore;
37 import org.apache.james.services.SpoolRepository;
38 import org.apache.mailet.*;
39
40 import javax.mail.MessagingException JavaDoc;
41
42 import java.util.Collection JavaDoc;
43 import java.util.HashMap JavaDoc;
44 import java.util.Iterator JavaDoc;
45
46 /**
47  * Manages the mail spool. This class is responsible for retrieving
48  * messages from the spool, directing messages to the appropriate
49  * processor, and removing them from the spool when processing is
50  * complete.
51  *
52  * @version CVS $Revision: 1.20.4.15 $ $Date: 2004/04/14 04:36:15 $
53  */

54 public class JamesSpoolManager
55     extends AbstractLogEnabled
56     implements Composable, Configurable, Initializable,
57                Runnable JavaDoc, Disposable, Component, Contextualizable {
58
59     private Context context;
60     /**
61      * Whether 'deep debugging' is turned on.
62      */

63     private final static boolean DEEP_DEBUG = false;
64
65     /**
66      * System component manager
67      */

68     private DefaultComponentManager compMgr;
69
70     /**
71      * The configuration object used by this spool manager.
72      */

73     private Configuration conf;
74
75     private SpoolRepository spool;
76
77     private MailetContext mailetContext;
78
79     /**
80      * The map of processor names to processors
81      */

82     private HashMap JavaDoc processors;
83
84     /**
85      * The number of threads used to move mail through the spool.
86      */

87     private int numThreads;
88
89     /**
90      * The ThreadPool containing worker threads.
91      *
92      * This used to be used, but for threads that lived the entire
93      * lifespan of the application. Currently commented out. In
94      * the future, we could use a thread pool to run short-lived
95      * workers, so that we have a smaller number of readers that
96      * accept a message from the spool, and dispatch to a pool of
97      * worker threads that process the message.
98      */

99     // private ThreadPool workerPool;
100

101     /**
102      * The ThreadManager from which the thread pool is obtained.
103      */

104     // private ThreadManager threadManager;
105

106     /**
107      * Number of active threads
108      */

109     private int numActive;
110
111     /**
112      * Spool threads are active
113      */

114     private boolean active;
115
116     /**
117      * Spool threads
118      */

119     private Collection JavaDoc spoolThreads;
120
121     /**
122      * @see org.apache.avalon.framework.component.Composable#compose(ComponentManager)
123      */

124     public void compose(ComponentManager comp)
125         throws ComponentException {
126         // threadManager = (ThreadManager)comp.lookup( ThreadManager.ROLE );
127
compMgr = new DefaultComponentManager(comp);
128     }
129
130     /**
131      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
132      */

133     public void configure(Configuration conf) throws ConfigurationException {
134         this.conf = conf;
135         numThreads = conf.getChild("threads").getValueAsInteger(1);
136     }
137
138     /**
139      * @see org.apache.avalon.framework.activity.Initializable#initialize()
140      */

141     public void initialize() throws Exception JavaDoc {
142
143         getLogger().info("JamesSpoolManager init...");
144         // workerPool = threadManager.getThreadPool( "default" );
145
MailStore mailstore
146             = (MailStore) compMgr.lookup("org.apache.james.services.MailStore");
147         spool = mailstore.getInboundSpool();
148         if (null == spool)
149         {
150             String JavaDoc exceptionMessage = "The mailstore's inbound spool is null. The mailstore is misconfigured";
151             if (getLogger().isErrorEnabled()) {
152                 getLogger().error( exceptionMessage );
153             }
154             throw new ConfigurationException(exceptionMessage);
155         }
156         if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
157             getLogger().debug("Got spool");
158         }
159
160         mailetContext
161             = (MailetContext) compMgr.lookup("org.apache.mailet.MailetContext");
162         MailetLoader mailetLoader = new MailetLoader();
163         MatchLoader matchLoader = new MatchLoader();
164         try {
165             mailetLoader.setLogger(getLogger());
166             matchLoader.setLogger(getLogger());
167             mailetLoader.contextualize(context);
168             matchLoader.contextualize(context);
169             mailetLoader.configure(conf.getChild("mailetpackages"));
170             matchLoader.configure(conf.getChild("matcherpackages"));
171             compMgr.put(Resources.MAILET_LOADER, mailetLoader);
172             compMgr.put(Resources.MATCH_LOADER, matchLoader);
173         } catch (ConfigurationException ce) {
174             final String JavaDoc message =
175                 "Unable to configure mailet/matcher Loaders: "
176                 + ce.getMessage();
177
178             if (getLogger().isErrorEnabled()) {
179                 getLogger().error( message, ce );
180             }
181             throw new RuntimeException JavaDoc( message );
182         }
183
184         //A processor is a Collection of
185
processors = new HashMap JavaDoc();
186
187         final Configuration[] processorConfs = conf.getChildren( "processor" );
188         for ( int i = 0; i < processorConfs.length; i++ )
189         {
190             Configuration processorConf = processorConfs[i];
191             String JavaDoc processorName = processorConf.getAttribute("name");
192             try {
193                 LinearProcessor processor = new LinearProcessor();
194                 setupLogger(processor, processorName);
195                 processor.setSpool(spool);
196                 processor.initialize();
197                 processors.put(processorName, processor);
198
199                 // If this is the root processor, add the PostmasterAlias
200
// mailet silently to the top
201
if (processorName.equals("root")) {
202                     Matcher matcher = matchLoader.getMatcher("All",
203                                                              mailetContext);
204                     Mailet mailet = mailetLoader.getMailet("PostmasterAlias",
205                                                            mailetContext, null);
206                     processor.add(matcher, mailet);
207                 }
208
209                 final Configuration[] mailetConfs
210                     = processorConf.getChildren( "mailet" );
211                 // Loop through the mailet configuration, load
212
// all of the matcher and mailets, and add
213
// them to the processor.
214
for ( int j = 0; j < mailetConfs.length; j++ )
215                 {
216                     Configuration c = mailetConfs[j];
217                     String JavaDoc mailetClassName = c.getAttribute("class");
218                     String JavaDoc matcherName = c.getAttribute("match");
219                     Mailet mailet = null;
220                     Matcher matcher = null;
221                     try {
222                         matcher = matchLoader.getMatcher(matcherName,
223                                                          mailetContext);
224                         //The matcher itself should log that it's been inited.
225
if (getLogger().isInfoEnabled()) {
226                             StringBuffer JavaDoc infoBuffer =
227                                 new StringBuffer JavaDoc(64)
228                                         .append("Matcher ")
229                                         .append(matcherName)
230                                         .append(" instantiated.");
231                             getLogger().info(infoBuffer.toString());
232                         }
233                     } catch (MessagingException JavaDoc ex) {
234                         // **** Do better job printing out exception
235
if (getLogger().isErrorEnabled()) {
236                             StringBuffer JavaDoc errorBuffer =
237                                 new StringBuffer JavaDoc(256)
238                                         .append("Unable to init matcher ")
239                                         .append(matcherName)
240                                         .append(": ")
241                                         .append(ex.toString());
242                             getLogger().error( errorBuffer.toString(), ex );
243                 if (ex.getNextException() != null) {
244                 getLogger().error( "Caused by nested exception: ", ex.getNextException());
245                 }
246                         }
247                         System.err.println("Unable to init matcher " + matcherName);
248                         System.err.println("Check spool manager logs for more details.");
249                         //System.exit(1);
250
throw ex;
251                     }
252                     try {
253                         mailet = mailetLoader.getMailet(mailetClassName,
254                                                         mailetContext, c);
255                         if (getLogger().isInfoEnabled()) {
256                             StringBuffer JavaDoc infoBuffer =
257                                 new StringBuffer JavaDoc(64)
258                                         .append("Mailet ")
259                                         .append(mailetClassName)
260                                         .append(" instantiated.");
261                             getLogger().info(infoBuffer.toString());
262                         }
263                     } catch (MessagingException JavaDoc ex) {
264                         // **** Do better job printing out exception
265
if (getLogger().isErrorEnabled()) {
266                             StringBuffer JavaDoc errorBuffer =
267                                 new StringBuffer JavaDoc(256)
268                                         .append("Unable to init mailet ")
269                                         .append(mailetClassName)
270                                         .append(": ")
271                                         .append(ex.toString());
272                             getLogger().error( errorBuffer.toString(), ex );
273                 if (ex.getNextException() != null) {
274                 getLogger().error( "Caused by nested exception: ", ex.getNextException());
275                 }
276                         }
277                         System.err.println("Unable to init mailet " + mailetClassName);
278                         System.err.println("Check spool manager logs for more details.");
279                         //System.exit(1);
280
throw ex;
281                     }
282                     //Add this pair to the processor
283
processor.add(matcher, mailet);
284                 }
285
286                 // Close the processor matcher/mailet lists.
287
//
288
// Please note that this is critical to the proper operation
289
// of the LinearProcessor code. The processor will not be
290
// able to service mails until this call is made.
291
processor.closeProcessorLists();
292
293                 if (getLogger().isInfoEnabled()) {
294                     StringBuffer JavaDoc infoBuffer =
295                         new StringBuffer JavaDoc(64)
296                                 .append("Processor ")
297                                 .append(processorName)
298                                 .append(" instantiated.");
299                     getLogger().info(infoBuffer.toString());
300                 }
301             } catch (Exception JavaDoc ex) {
302                 if (getLogger().isErrorEnabled()) {
303                     StringBuffer JavaDoc errorBuffer =
304                        new StringBuffer JavaDoc(256)
305                                .append("Unable to init processor ")
306                                .append(processorName)
307                                .append(": ")
308                                .append(ex.toString());
309                     getLogger().error( errorBuffer.toString(), ex );
310                 }
311                 throw ex;
312             }
313         }
314         if (getLogger().isInfoEnabled()) {
315             StringBuffer JavaDoc infoBuffer =
316                 new StringBuffer JavaDoc(64)
317                     .append("Spooler Manager uses ")
318                     .append(numThreads)
319                     .append(" Thread(s)");
320             getLogger().info(infoBuffer.toString());
321         }
322
323         active = true;
324         numActive = 0;
325         spoolThreads = new java.util.ArrayList JavaDoc(numThreads);
326         for ( int i = 0 ; i < numThreads ; i++ ) {
327             Thread JavaDoc reader = new Thread JavaDoc(this, "Spool Thread #" + i);
328             spoolThreads.add(reader);
329             reader.start();
330         }
331     }
332
333     /**
334      * This routinely checks the message spool for messages, and processes
335      * them as necessary
336      */

337     public void run() {
338
339         if (getLogger().isInfoEnabled())
340         {
341             getLogger().info("Run JamesSpoolManager: "
342                              + Thread.currentThread().getName());
343             getLogger().info("Spool=" + spool.getClass().getName());
344         }
345
346         numActive++;
347         while(active) {
348             String JavaDoc key = null;
349             try {
350                 MailImpl mail = (MailImpl)spool.accept();
351                 key = mail.getName();
352                 if (getLogger().isDebugEnabled()) {
353                     StringBuffer JavaDoc debugBuffer =
354                         new StringBuffer JavaDoc(64)
355                                 .append("==== Begin processing mail ")
356                                 .append(mail.getName())
357                                 .append("====");
358                     getLogger().debug(debugBuffer.toString());
359                 }
360                 process(mail);
361                 // Only remove an email from the spool is processing is
362
// complete, or if it has no recipients
363
if ((Mail.GHOST.equals(mail.getState())) ||
364                     (mail.getRecipients() == null) ||
365                     (mail.getRecipients().size() == 0)) {
366                     spool.remove(key);
367                     if (getLogger().isDebugEnabled()) {
368                         StringBuffer JavaDoc debugBuffer =
369                             new StringBuffer JavaDoc(64)
370                                     .append("==== Removed from spool mail ")
371                                     .append(mail.getName())
372                                     .append("====");
373                         getLogger().debug(debugBuffer.toString());
374                     }
375                 }
376                 else {
377                     // spool.remove() has a side-effect! It unlocks the
378
// message so that other threads can work on it! If
379
// we don't remove it, we must unlock it!
380
spool.unlock(key);
381                 }
382                 mail = null;
383             } catch (InterruptedException JavaDoc ie) {
384                 getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
385             } catch (Throwable JavaDoc e) {
386                 if (getLogger().isErrorEnabled()) {
387                     getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
388                                       + e.getMessage(), e);
389                 }
390                 /* Move the mail to ERROR state? If we do, it could be
391                  * deleted if an error occurs in the ERROR processor.
392                  * Perhaps the answer is to resolve that issue by
393                  * having a special state for messages that are not to
394                  * be processed, but aren't to be deleted? The message
395                  * would already be in the spool, but would not be
396                  * touched again.
397                 if (mail != null) {
398                     try {
399                         mail.setState(Mail.ERROR);
400                         spool.store(mail);
401                     }
402                 }
403                 */

404             }
405         }
406         if (getLogger().isInfoEnabled())
407         {
408             getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
409         }
410         numActive--;
411     }
412
413     /**
414      * Process this mail message by the appropriate processor as designated
415      * in the state of the Mail object.
416      *
417      * @param mail the mail message to be processed
418      */

419     protected void process(MailImpl mail) {
420         while (true) {
421             String JavaDoc processorName = mail.getState();
422             if (processorName.equals(Mail.GHOST)) {
423                 //This message should disappear
424
return;
425             }
426             try {
427                 LinearProcessor processor
428                     = (LinearProcessor)processors.get(processorName);
429                 if (processor == null) {
430                     StringBuffer JavaDoc exceptionMessageBuffer =
431                         new StringBuffer JavaDoc(128)
432                             .append("Unable to find processor ")
433                             .append(processorName)
434                             .append(" requested for processing of ")
435                             .append(mail.getName());
436                     String JavaDoc exceptionMessage = exceptionMessageBuffer.toString();
437                     getLogger().debug(exceptionMessage);
438                     mail.setState(Mail.ERROR);
439                     throw new MailetException(exceptionMessage);
440                 }
441                 StringBuffer JavaDoc logMessageBuffer = null;
442                 if (getLogger().isDebugEnabled()) {
443                     logMessageBuffer =
444                         new StringBuffer JavaDoc(64)
445                                 .append("Processing ")
446                                 .append(mail.getName())
447                                 .append(" through ")
448                                 .append(processorName);
449                     getLogger().debug(logMessageBuffer.toString());
450                 }
451                 processor.service(mail);
452                 if (getLogger().isDebugEnabled()) {
453                     logMessageBuffer =
454                         new StringBuffer JavaDoc(128)
455                                 .append("Processed ")
456                                 .append(mail.getName())
457                                 .append(" through ")
458                                 .append(processorName);
459                     getLogger().debug(logMessageBuffer.toString());
460                     getLogger().debug("Result was " + mail.getState());
461                 }
462                 return;
463             } catch (Throwable JavaDoc e) {
464                 // This is a strange error situation that shouldn't ordinarily
465
// happen
466
StringBuffer JavaDoc exceptionBuffer =
467                     new StringBuffer JavaDoc(64)
468                             .append("Exception in processor <")
469                             .append(processorName)
470                             .append(">");
471                 getLogger().error(exceptionBuffer.toString(), e);
472                 if (processorName.equals(Mail.ERROR)) {
473                     // We got an error on the error processor...
474
// kill the message
475
mail.setState(Mail.GHOST);
476                     mail.setErrorMessage(e.getMessage());
477                 } else {
478                     //We got an error... send it to the requested processor
479
if (!(e instanceof MessagingException JavaDoc)) {
480                         //We got an error... send it to the error processor
481
mail.setState(Mail.ERROR);
482                     }
483                     mail.setErrorMessage(e.getMessage());
484                 }
485             }
486             if (getLogger().isErrorEnabled()) {
487                 StringBuffer JavaDoc logMessageBuffer =
488                     new StringBuffer JavaDoc(128)
489                             .append("An error occurred processing ")
490                             .append(mail.getName())
491                             .append(" through ")
492                             .append(processorName);
493                 getLogger().error(logMessageBuffer.toString());
494                 getLogger().error("Result was " + mail.getState());
495             }
496         }
497     }
498
499     /**
500      * The dispose operation is called at the end of a components lifecycle.
501      * Instances of this class use this method to release and destroy any
502      * resources that they own.
503      *
504      * This implementation shuts down the LinearProcessors managed by this
505      * JamesSpoolManager
506      *
507      * @throws Exception if an error is encountered during shutdown
508      */

509     public void dispose() {
510         getLogger().info("JamesSpoolManager dispose...");
511         active = false; // shutdown the threads
512
for (Iterator JavaDoc it = spoolThreads.iterator(); it.hasNext(); ) {
513             ((Thread JavaDoc) it.next()).interrupt(); // interrupt any waiting accept() calls.
514
}
515
516         long stop = System.currentTimeMillis() + 60000;
517         // give the spooler threads one minute to terminate gracefully
518
while (numActive != 0 && stop > System.currentTimeMillis()) {
519             try {
520                 Thread.sleep(1000);
521             } catch (Exception JavaDoc ignored) {}
522         }
523         getLogger().info("JamesSpoolManager thread shutdown completed.");
524
525         Iterator JavaDoc it = processors.keySet().iterator();
526         while (it.hasNext()) {
527             String JavaDoc processorName = (String JavaDoc)it.next();
528             if (getLogger().isDebugEnabled()) {
529                 getLogger().debug("Processor " + processorName);
530             }
531             LinearProcessor processor = (LinearProcessor)processors.get(processorName);
532             processor.dispose();
533             processors.remove(processor);
534         }
535     }
536
537     /**
538      * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
539      */

540     public void contextualize(Context context) {
541         this.context = context;
542     }
543 }
544
Popular Tags