KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > james > transport > LinearProcessor


1 /***********************************************************************
2  * Copyright (c) 2000-2004 The Apache Software Foundation. *
3  * All rights reserved. *
4  * ------------------------------------------------------------------- *
5  * Licensed under the Apache License, Version 2.0 (the "License"); you *
6  * may not use this file except in compliance with the License. You *
7  * may obtain a copy of the License at: *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14  * implied. See the License for the specific language governing *
15  * permissions and limitations under the License. *
16  ***********************************************************************/

17
18 package org.apache.james.transport;
19
20 import org.apache.avalon.framework.activity.Initializable;
21 import org.apache.avalon.framework.activity.Disposable;
22 import org.apache.avalon.framework.logger.AbstractLogEnabled;
23 import org.apache.avalon.framework.logger.Logger;
24 import org.apache.james.core.MailImpl;
25 import org.apache.james.core.MailetConfigImpl;
26 import org.apache.james.services.SpoolRepository;
27 import org.apache.mailet.*;
28
29 import javax.mail.MessagingException JavaDoc;
30 import java.io.PrintWriter JavaDoc;
31 import java.io.StringWriter JavaDoc;
32 import java.util.ArrayList JavaDoc;
33 import java.util.Collection JavaDoc;
34 import java.util.List JavaDoc;
35 import java.util.Random JavaDoc;
36 import java.util.Iterator JavaDoc;
37 import java.util.Locale JavaDoc;
38
39 /**
40  * Implements a processor for mails, directing the mail down
41  * the chain of matchers/mailets.
42  *
43  * SAMPLE CONFIGURATION
44  * <processor name="try" onerror="return,log">
45  * <mailet match="RecipientIsLocal" class="LocalDelivery">
46  * </mailet>
47  * <mailet match="All" class="RemoteDelivery">
48  * <delayTime>21600000</delayTime>
49  * <maxRetries>5</maxRetries>
50  * </mailet>
51  * </processor>
52  *
53  * Note that the 'onerror' attribute is not yet supported.
54  *
55  * As of James v2.2.0a5, 'onerror' functionality is implemented, but
56  * it is implemented on the <mailet> tag. The specification is:
57  *
58  * <mailet match="..." class="..."
59  * [onMatchException="{noMatch|matchAll|error|<aProcessorName>}"]
60  * [onMailetException="{ignore|error|<aProcessorName>}"]>
61  *
62  * noMatch: no addresses are considered to match
63  * matchAll: all addresses are considered to match
64  * error: as before, send the message to the ERROR processor
65  *
66  * Otherwise, a processor name can be specified, and the message will
67  * be sent there.
68  *
69  * <P>CVS $Id: LinearProcessor.java,v 1.10.4.7 2004/04/14 06:41:56 noel Exp $</P>
70  * @version 2.2.0
71  */

72 public class LinearProcessor
73     extends AbstractLogEnabled
74     implements Initializable, Disposable {
75
76     private static final Random JavaDoc random = new Random JavaDoc(); // Used to generate new mail names
77

78     /**
79      * The name of the matcher used to terminate the matcher chain. The
80      * end of the matcher/mailet chain must be a matcher that matches
81      * all mails and a mailet that sets every mail to GHOST status.
82      * This is necessary to ensure that mails are removed from the spool
83      * in an orderly fashion.
84      */

85     private static final String JavaDoc TERMINATING_MATCHER_NAME = "Terminating%Matcher%Name";
86
87     /**
88      * The name of the mailet used to terminate the mailet chain. The
89      * end of the matcher/mailet chain must be a matcher that matches
90      * all mails and a mailet that sets every mail to GHOST status.
91      * This is necessary to ensure that mails are removed from the spool
92      * in an orderly fashion.
93      */

94     private static final String JavaDoc TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
95
96     private List JavaDoc mailets; // The list of mailets for this processor
97
private List JavaDoc matchers; // The list of matchers for this processor
98
private volatile boolean listsClosed; // Whether the matcher/mailet lists have been closed.
99
private SpoolRepository spool; // The spool on which this processor is acting
100

101     /**
102      * Set the spool to be used by this LinearProcessor.
103      *
104      * @param spool the spool to be used by this processor
105      *
106      * @throws IllegalArgumentException when the spool passed in is null
107      */

108     public void setSpool(SpoolRepository spool) {
109         if (spool == null) {
110             throw new IllegalArgumentException JavaDoc("The spool cannot be null");
111         }
112         this.spool = spool;
113     }
114
115     /**
116      * @see org.apache.avalon.framework.activity.Initializable#initialize()
117      */

118     public void initialize() {
119         matchers = new ArrayList JavaDoc();
120         mailets = new ArrayList JavaDoc();
121     }
122
123     /**
124      * <p>The dispose operation is called at the end of a components lifecycle.
125      * Instances of this class use this method to release and destroy any
126      * resources that they own.</p>
127      *
128      * <p>This implementation disposes of all the mailet instances added to the
129      * processor</p>
130      *
131      * @throws Exception if an error is encountered during shutdown
132      */

133     public void dispose() {
134         Iterator JavaDoc it = mailets.iterator();
135         boolean debugEnabled = getLogger().isDebugEnabled();
136         while (it.hasNext()) {
137             Mailet mailet = (Mailet)it.next();
138             if (debugEnabled) {
139                 getLogger().debug("Shutdown mailet " + mailet.getMailetInfo());
140             }
141             mailet.destroy();
142         }
143     }
144
145     /**
146      * <p>Adds a new <code>Matcher</code> / <code>Mailet</code> pair
147      * to the processor. Checks to ensure that the matcher and
148      * mailet passed in are not null. Synchronized to ensure that
149      * the matchers and mailets are kept in sync.</p>
150      *
151      * <p>It is an essential part of the contract of the LinearProcessor
152      * that a particular matcher/mailet combination be used to
153      * terminate the processor chain. This is done by calling the
154      * closeProcessorList method.</p>
155      *
156      * <p>Once the closeProcessorList has been called any subsequent
157      * call to the add method will result in an IllegalStateException.</p>
158      *
159      * <p>This method is synchronized to protect against corruption of
160      * matcher/mailets lists</p>
161      *
162      * @param matcher the new matcher being added
163      * @param mailet the new mailet being added
164      *
165      * @throws IllegalArgumentException when the matcher or mailet passed in is null
166      * @throws IllegalStateException when this method is called after the processor lists have been closed
167      */

168     public synchronized void add(Matcher matcher, Mailet mailet) {
169         if (matcher == null) {
170             throw new IllegalArgumentException JavaDoc("Null valued matcher passed to LinearProcessor.");
171         }
172         if (mailet == null) {
173             throw new IllegalArgumentException JavaDoc("Null valued mailet passed to LinearProcessor.");
174         }
175         if (listsClosed) {
176             throw new IllegalStateException JavaDoc("Attempt to add matcher/mailet after lists have been closed");
177         }
178         matchers.add(matcher);
179         mailets.add(mailet);
180     }
181
182     /**
183      * <p>Closes the processor matcher/mailet list.</p>
184      *
185      * <p>This method is synchronized to protect against corruption of
186      * matcher/mailets lists</p>
187      *
188      * @throws IllegalStateException when this method is called after the processor lists have been closed
189      */

190     public synchronized void closeProcessorLists() {
191         if (listsClosed) {
192             throw new IllegalStateException JavaDoc("Processor's matcher/mailet lists have already been closed.");
193         }
194         Matcher terminatingMatcher =
195             new GenericMatcher() {
196                 public Collection JavaDoc match(Mail mail) {
197                     return mail.getRecipients();
198                 }
199             
200                 public String JavaDoc getMatcherInfo() {
201                     return TERMINATING_MATCHER_NAME;
202                 }
203             };
204         Mailet terminatingMailet =
205             new GenericMailet() {
206                 public void service(Mail mail) {
207                     if (!(Mail.ERROR.equals(mail.getState()))) {
208                         // Don't complain if we fall off the end of the
209
// error processor. That is currently the
210
// normal situation for James, and the message
211
// will show up in the error store.
212
StringBuffer JavaDoc warnBuffer = new StringBuffer JavaDoc(256)
213                                               .append("Message ")
214                                               .append(((MailImpl)mail).getName())
215                                               .append(" reached the end of this processor, and is automatically deleted. This may indicate a configuration error.");
216                         LinearProcessor.this.getLogger().warn(warnBuffer.toString());
217                     }
218                     mail.setState(Mail.GHOST);
219                 }
220             
221                 public String JavaDoc getMailetInfo() {
222                     return getMailetName();
223                 }
224             
225                 public String JavaDoc getMailetName() {
226                     return TERMINATING_MAILET_NAME;
227                 }
228             };
229         add(terminatingMatcher, terminatingMailet);
230         listsClosed = true;
231     }
232
233     /**
234      * <p>Processes a single mail message through the chain of matchers and mailets.</p>
235      *
236      * <p>Calls to this method before setSpool has been called with a non-null argument
237      * will result in an <code>IllegalStateException</code>.</p>
238      *
239      * <p>If the matcher/mailet lists have not been closed by a call to the closeProcessorLists
240      * method then a call to this method will result in an <code>IllegalStateException</code>.
241      * The end of the matcher/mailet chain must be a matcher that matches all mails and
242      * a mailet that sets every mail to GHOST status. This is necessary to ensure that
243      * mails are removed from the spool in an orderly fashion. The closeProcessorLists method
244      * ensures this.</p>
245      *
246      * @param mail the new mail to be processed
247      *
248      * @throws IllegalStateException when this method is called before the processor lists have been closed
249      * or the spool has been initialized
250      */

251     public void service(MailImpl mail) throws MessagingException JavaDoc {
252         if (spool == null) {
253             throw new IllegalStateException JavaDoc("Attempt to service mail before the spool has been set to a non-null value");
254         }
255
256         if (!listsClosed) {
257             throw new IllegalStateException JavaDoc("Attempt to service mail before matcher/mailet lists have been closed");
258         }
259
260         if (getLogger().isDebugEnabled()) {
261             getLogger().debug("Servicing mail: " + mail.getName());
262         }
263         // unprocessed is an array of Lists of Mail objects
264
// the array indicates which matcher/mailet (stage in the linear
265
// processor) that this Mail needs to be processed.
266
// e.g., a Mail in unprocessed[0] needs to be
267
// processed by the first matcher/mailet.
268
//
269
// It is a List of Mail objects at each array spot as multiple Mail
270
// objects could be at the same stage.
271
//
272
// Note that every Mail object in this array will either be the
273
// original Mail object passed in, or a result of this method's
274
// (and hence this thread's) processing.
275

276         List JavaDoc[] unprocessed = new List JavaDoc[matchers.size() + 1];
277
278         for (int i = 0; i < unprocessed.length; i++) {
279             // No need to use synchronization, as this is totally
280
// local to the method
281
unprocessed[i] = new ArrayList JavaDoc();
282         }
283
284         //Add the object to the bottom of the list
285
unprocessed[0].add(mail);
286
287         //This is the original state of the message
288
String JavaDoc originalState = mail.getState();
289
290         //We'll use these as temporary variables in the loop
291
mail = null; // the message we're currently processing
292
int i = 0; // where in the stage we're looking
293
while (true) {
294             // The last element in the unprocessed array has mail messages
295
// that have completed all stages. We want them to just die,
296
// so we clear that spot to allow garbage collection of the
297
// objects.
298
//
299
// Please note that the presence of the terminating mailet at the end
300
// of the chain is critical to the proper operation
301
// of the LinearProcessor code. If this mailet is not placed
302
// at the end of the chain with a terminating matcher, there is a
303
// potential for configuration or implementation errors to
304
// lead to mails trapped in the spool. This matcher/mailet
305
// combination is added when the closeProcessorList method
306
// is called.
307
unprocessed[unprocessed.length - 1].clear();
308
309             //initialize the mail reference we will be searching on
310
mail = null;
311
312             //Scan through all stages, trying to find a message to process
313
for (i = 0; i < unprocessed.length; i++) {
314                 if (unprocessed[i].size() > 0) {
315                     //Get the first element from the queue, and remove it from there
316
mail = (MailImpl)unprocessed[i].remove(0);
317                     break;
318                 }
319             }
320
321             //Check it we found anything
322
if (mail == null) {
323                 //We found no messages to process... we're done servicing the mail object
324
return;
325             }
326
327
328             //Call the matcher and find what recipients match
329
Collection JavaDoc recipients = null;
330             Matcher matcher = (Matcher) matchers.get(i);
331             StringBuffer JavaDoc logMessageBuffer = null;
332             if (getLogger().isDebugEnabled()) {
333                 logMessageBuffer =
334                     new StringBuffer JavaDoc(128)
335                             .append("Checking ")
336                             .append(mail.getName())
337                             .append(" with ")
338                             .append(matcher);
339                 getLogger().debug(logMessageBuffer.toString());
340             }
341             try {
342                 recipients = matcher.match(mail);
343                 if (recipients == null) {
344                     //In case the matcher returned null, create an empty Collection
345
recipients = new ArrayList JavaDoc(0);
346                 } else if (recipients != mail.getRecipients()) {
347                     //Make sure all the objects are MailAddress objects
348
verifyMailAddresses(recipients);
349                 }
350             } catch (MessagingException JavaDoc me) {
351                 // look in the matcher's mailet's init attributes
352
MailetConfig mailetConfig = ((Mailet) mailets.get(i)).getMailetConfig();
353                 String JavaDoc onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
354                 if (onMatchException == null) {
355                     onMatchException = Mail.ERROR;
356                 } else {
357                     onMatchException = onMatchException.trim().toLowerCase(Locale.US);
358                 }
359                 if (onMatchException.compareTo("nomatch") == 0) {
360                     //In case the matcher returned null, create an empty Collection
361
recipients = new ArrayList JavaDoc(0);
362                 } else if (onMatchException.compareTo("matchall") == 0) {
363                     recipients = mail.getRecipients();
364                     // no need to verify addresses
365
} else {
366                     handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException);
367                 }
368             }
369
370             // Split the recipients into two pools. notRecipients will contain the
371
// recipients on the message that the matcher did not return.
372
Collection JavaDoc notRecipients;
373             if (recipients == mail.getRecipients() || recipients.size() == 0) {
374                 notRecipients = new ArrayList JavaDoc(0);
375             } else {
376                 notRecipients = new ArrayList JavaDoc(mail.getRecipients());
377                 notRecipients.removeAll(recipients);
378             }
379
380             if (recipients.size() == 0) {
381                 //Everything was not a match... store it in the next spot in the array
382
unprocessed[i + 1].add(mail);
383                 continue;
384             }
385             if (notRecipients.size() != 0) {
386                 // There are a mix of recipients and not recipients.
387
// We need to clone this message, put the notRecipients on the clone
388
// and store it in the next spot
389
MailImpl notMail = (MailImpl)mail.duplicate(newName(mail));
390                 notMail.setRecipients(notRecipients);
391                 unprocessed[i + 1].add(notMail);
392                 //We have to set the reduce possible recipients on the old message
393
mail.setRecipients(recipients);
394             }
395             // We have messages that need to process... time to run the mailet.
396
Mailet mailet = (Mailet) mailets.get(i);
397             if (getLogger().isDebugEnabled()) {
398                 logMessageBuffer =
399                     new StringBuffer JavaDoc(128)
400                             .append("Servicing ")
401                             .append(mail.getName())
402                             .append(" by ")
403                             .append(mailet.getMailetInfo());
404                 getLogger().debug(logMessageBuffer.toString());
405             }
406             try {
407                 mailet.service(mail);
408                 // Make sure all the recipients are still MailAddress objects
409
verifyMailAddresses(mail.getRecipients());
410             } catch (MessagingException JavaDoc me) {
411                 MailetConfig mailetConfig = mailet.getMailetConfig();
412                 String JavaDoc onMailetException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMailetException");
413                 if (onMailetException == null) {
414                     onMailetException = Mail.ERROR;
415                 } else {
416                     onMailetException = onMailetException.trim().toLowerCase(Locale.US);
417                 }
418                 if (onMailetException.compareTo("ignore") == 0) {
419                     // ignore the exception and continue
420
// this option should not be used if the mail object can be changed by the mailet
421
verifyMailAddresses(mail.getRecipients());
422                 } else {
423                     handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException);
424                 }
425             }
426
427             // See if the state was changed by the mailet
428
if (!mail.getState().equals(originalState)) {
429                 //If this message was ghosted, we just want to let it die
430
if (mail.getState().equals(Mail.GHOST)) {
431                     // let this instance die...
432
mail = null;
433                     continue;
434                 }
435                 // This was just set to another state requiring further processing...
436
// Store this back in the spool and it will get picked up and
437
// run in that processor
438
spool.store(mail);
439                 mail = null;
440                 continue;
441             } else {
442                 // Ok, we made it through with the same state... move it to the next
443
// spot in the array
444
unprocessed[i + 1].add(mail);
445             }
446
447         }
448     }
449
450     /**
451      * Create a unique new primary key name.
452      *
453      * @param mail the mail to use as the basis for the new mail name
454      *
455      * @return a new name
456      */

457     private String JavaDoc newName(MailImpl mail) {
458         StringBuffer JavaDoc nameBuffer =
459             new StringBuffer JavaDoc(64)
460                     .append(mail.getName())
461                     .append("-!")
462                     .append(random.nextInt(1048576));
463         return nameBuffer.toString();
464     }
465
466
467
468     /**
469      * Checks that all objects in this class are of the form MailAddress.
470      *
471      * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
472      */

473     private void verifyMailAddresses(Collection JavaDoc col) throws MessagingException JavaDoc {
474         try {
475             MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
476
477             // Why is this here? According to the javadoc for
478
// java.util.Collection.toArray(Object[]), this should
479
// never happen. The exception will be thrown.
480
if (addresses.length != col.size()) {
481                 throw new MailetException("The recipient list contains objects other than MailAddress objects");
482             }
483         } catch (ArrayStoreException JavaDoc ase) {
484             throw new MailetException("The recipient list contains objects other than MailAddress objects");
485         }
486     }
487
488     /**
489      * This is a helper method that updates the state of the mail object to
490      * Mail.ERROR as well as recording the exception to the log
491      *
492      * @param me the exception to be handled
493      * @param mail the mail being processed when the exception was generated
494      * @param offendersName the matcher or mailet than generated the exception
495      * @param nextState the next state to set
496      *
497      * @throws MessagingException thrown always, rethrowing the passed in exception
498      */

499     private void handleException(MessagingException JavaDoc me, Mail mail, String JavaDoc offendersName, String JavaDoc nextState) throws MessagingException JavaDoc {
500         System.err.println("exception! " + me);
501         mail.setState(nextState);
502         StringWriter JavaDoc sout = new StringWriter JavaDoc();
503         PrintWriter JavaDoc out = new PrintWriter JavaDoc(sout, true);
504         StringBuffer JavaDoc exceptionBuffer =
505             new StringBuffer JavaDoc(128)
506                     .append("Exception calling ")
507                     .append(offendersName)
508                     .append(": ")
509                     .append(me.getMessage());
510         out.println(exceptionBuffer.toString());
511         Exception JavaDoc e = me;
512         while (e != null) {
513             e.printStackTrace(out);
514             if (e instanceof MessagingException JavaDoc) {
515                 e = ((MessagingException JavaDoc)e).getNextException();
516             } else {
517                 e = null;
518             }
519         }
520         String JavaDoc errorString = sout.toString();
521         mail.setErrorMessage(errorString);
522         getLogger().error(errorString);
523         throw me;
524     }
525 }
526
Popular Tags