KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > james > nntpserver > repository > NNTPSpooler


1 /***********************************************************************
2  * Copyright (c) 2000-2004 The Apache Software Foundation. *
3  * All rights reserved. *
4  * ------------------------------------------------------------------- *
5  * Licensed under the Apache License, Version 2.0 (the "License"); you *
6  * may not use this file except in compliance with the License. You *
7  * may obtain a copy of the License at: *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14  * implied. See the License for the specific language governing *
15  * permissions and limitations under the License. *
16  ***********************************************************************/

17
18 package org.apache.james.nntpserver.repository;
19
20 import org.apache.avalon.excalibur.io.IOUtil;
21 import org.apache.avalon.framework.activity.Initializable;
22 import org.apache.avalon.framework.configuration.Configurable;
23 import org.apache.avalon.framework.configuration.Configuration;
24 import org.apache.avalon.framework.configuration.ConfigurationException;
25 import org.apache.avalon.framework.context.Context;
26 import org.apache.avalon.framework.context.ContextException;
27 import org.apache.avalon.framework.context.Contextualizable;
28 import org.apache.avalon.framework.logger.AbstractLogEnabled;
29 import org.apache.avalon.framework.logger.LogEnabled;
30 import org.apache.james.context.AvalonContextUtilities;
31 import org.apache.james.util.Lock;
32
33 import javax.mail.internet.MimeMessage JavaDoc;
34 import java.io.BufferedReader JavaDoc;
35 import java.io.File JavaDoc;
36 import java.io.FileInputStream JavaDoc;
37 import java.io.FileOutputStream JavaDoc;
38 import java.io.InputStreamReader JavaDoc;
39 import java.io.IOException JavaDoc;
40 import java.util.Properties JavaDoc;
41 import java.util.StringTokenizer JavaDoc;
42
43 /**
44  * Processes entries and sends to appropriate groups.
45  * Eats up inappropriate entries.
46  *
47  */

48 class NNTPSpooler extends AbstractLogEnabled
49         implements Contextualizable, Configurable, Initializable {
50
51     /**
52      * The spooler context
53      */

54     private Context context;
55
56     /**
57      * The array of spooler runnables, each associated with a Worker thread
58      */

59     private SpoolerRunnable[] worker;
60
61     /**
62      * The directory containing entries to be spooled.
63      */

64     private File JavaDoc spoolPath;
65
66     /**
67      * The String form of the spool directory.
68      */

69     private String JavaDoc spoolPathString;
70
71     /**
72      * The time the spooler threads sleep between processing
73      */

74     private int threadIdleTime = 0;
75
76     /**
77      * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
78      */

79     public void contextualize(final Context context)
80             throws ContextException {
81         this.context = context;
82     }
83
84     /**
85      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
86      */

87     public void configure( Configuration configuration ) throws ConfigurationException {
88         int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);
89         threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);
90         spoolPathString = configuration.getChild("spoolPath").getValue();
91         worker = new SpoolerRunnable[threadCount];
92     }
93
94     /**
95      * @see org.apache.avalon.framework.activity.Initializable#initialize()
96      */

97     public void initialize() throws Exception JavaDoc {
98         //System.out.println(getClass().getName()+": init");
99

100         try {
101             spoolPath = AvalonContextUtilities.getFile(context, spoolPathString);
102             if ( spoolPath.exists() == false ) {
103                 spoolPath.mkdirs();
104             } else if (!(spoolPath.isDirectory())) {
105                 StringBuffer JavaDoc errorBuffer =
106                     new StringBuffer JavaDoc(128)
107                         .append("Spool directory is improperly configured. The specified path ")
108                         .append(spoolPathString)
109                         .append(" is not a directory.");
110                 throw new ConfigurationException(errorBuffer.toString());
111             }
112         } catch (Exception JavaDoc e) {
113             getLogger().fatalError(e.getMessage(), e);
114             throw e;
115         }
116
117         for ( int i = 0 ; i < worker.length ; i++ ) {
118             worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);
119             if ( worker[i] instanceof LogEnabled ) {
120                 ((LogEnabled)worker[i]).enableLogging(getLogger());
121             }
122         }
123
124         // TODO: Replace this with a standard Avalon thread pool
125
for ( int i = 0 ; i < worker.length ; i++ ) {
126             new Thread JavaDoc(worker[i],"NNTPSpool-"+i).start();
127         }
128     }
129
130     /**
131      * Sets the repository used by this spooler.
132      *
133      * @param repo the repository to be used
134      */

135     void setRepository(NNTPRepository repo) {
136         for ( int i = 0 ; i < worker.length ; i++ ) {
137             worker[i].setRepository(repo);
138         }
139     }
140
141     /**
142      * Sets the article id repository used by this spooler.
143      *
144      * @param articleIDRepo the article id repository to be used
145      */

146     void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
147         for ( int i = 0 ; i < worker.length ; i++ ) {
148             worker[i].setArticleIDRepository(articleIDRepo);
149         }
150     }
151
152     /**
153      * Returns (and creates, if the directory doesn't already exist) the
154      * spool directory
155      *
156      * @return the spool directory
157      */

158     File JavaDoc getSpoolPath() {
159         return spoolPath;
160     }
161
162     /**
163      * A static inner class that provides the body for the spool
164      * threads.
165      */

166     static class SpoolerRunnable extends AbstractLogEnabled implements Runnable JavaDoc {
167
168         private static final Lock lock = new Lock();
169
170         /**
171          * The directory containing entries to be spooled.
172          */

173         private final File JavaDoc spoolPath;
174
175         /**
176          * The time the spooler thread sleeps between processing
177          */

178         private final int threadIdleTime;
179
180         /**
181          * The article ID repository used by this spooler thread
182          */

183         private ArticleIDRepository articleIDRepo;
184
185         /**
186          * The NNTP repository used by this spooler thread
187          */

188         private NNTPRepository repo;
189
190         SpoolerRunnable(int threadIdleTime,File JavaDoc spoolPath) {
191             this.threadIdleTime = threadIdleTime;
192             this.spoolPath = spoolPath;
193         }
194
195         /**
196          * Sets the article id repository used by this spooler thread.
197          *
198          * @param articleIDRepo the article id repository to be used
199          */

200         void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
201             this.articleIDRepo = articleIDRepo;
202         }
203
204         /**
205          * Sets the repository used by this spooler thread.
206          *
207          * @param repo the repository to be used
208          */

209         void setRepository(NNTPRepository repo) {
210             this.repo = repo;
211         }
212
213         /**
214          * The threads race to grab a lock. if a thread wins it processes the article,
215          * if it loses it tries to lock and process the next article.
216          */

217         public void run() {
218             getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");
219             try {
220                 while ( Thread.currentThread().interrupted() == false ) {
221                     String JavaDoc[] list = spoolPath.list();
222                     if (list.length > 0) getLogger().debug("Files to process: "+list.length);
223                     for ( int i = 0 ; i < list.length ; i++ ) {
224                         if ( lock.lock(list[i]) ) {
225                             File JavaDoc f = new File JavaDoc(spoolPath,list[i]).getAbsoluteFile();
226                             getLogger().debug("Processing file: "+f.getAbsolutePath());
227                             try {
228                                 process(f);
229                             } catch(Throwable JavaDoc ex) {
230                                 getLogger().debug("Exception occured while processing file: "+
231                                                   f.getAbsolutePath(),ex);
232                             } finally {
233                                 lock.unlock(list[i]);
234                             }
235                         }
236                         list[i] = null; // release the string entry;
237
}
238                     list = null; // release the array;
239
// this is good for other non idle threads
240
try {
241                         Thread.currentThread().sleep(threadIdleTime);
242                     } catch(InterruptedException JavaDoc ex) {
243                         // Ignore and continue
244
}
245                 }
246             } finally {
247                 Thread.currentThread().interrupted();
248             }
249         }
250
251         /**
252          * Process a file stored in the spool.
253          *
254          * @param f the spool file being processed
255          */

256         private void process(File JavaDoc spoolFile) throws Exception JavaDoc {
257             StringBuffer JavaDoc logBuffer =
258                 new StringBuffer JavaDoc(160)
259                         .append("process: ")
260                         .append(spoolFile.getAbsolutePath())
261                         .append(",")
262                         .append(spoolFile.getCanonicalPath());
263             getLogger().debug(logBuffer.toString());
264             final MimeMessage JavaDoc msg;
265             String JavaDoc articleID;
266             // TODO: Why is this a block?
267
{ // Get the message for copying to destination groups.
268
FileInputStream JavaDoc fin = new FileInputStream JavaDoc(spoolFile);
269                 try {
270                     msg = new MimeMessage JavaDoc(null,fin);
271                 } finally {
272                     IOUtil.shutdownStream(fin);
273                 }
274
275                 String JavaDoc lineCount = null;
276                 String JavaDoc[] lineCountHeader = msg.getHeader("Lines");
277                 if (lineCountHeader == null || lineCountHeader.length == 0) {
278                     BufferedReader JavaDoc rdr = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(msg.getDataHandler().getInputStream()));
279                     int lines = 0;
280                     while (rdr.readLine() != null) {
281                         lines++;
282                     }
283
284                     lineCount = Integer.toString(lines);
285                     rdr.close();
286
287                     msg.setHeader("Lines", lineCount);
288                 }
289
290                 // ensure no duplicates exist.
291
String JavaDoc[] idheader = msg.getHeader("Message-Id");
292                 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);
293                 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {
294                     getLogger().debug("Message already exists: "+articleID);
295                     if (spoolFile.delete() == false)
296                         getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());
297                     return;
298                 }
299                 if ( articleID == null || lineCount != null) {
300                     if (articleID == null) {
301                         articleID = articleIDRepo.generateArticleID();
302                         msg.setHeader("Message-Id", articleID);
303                     }
304                     FileOutputStream JavaDoc fout = new FileOutputStream JavaDoc(spoolFile);
305                     try {
306                         msg.writeTo(fout);
307                     } finally {
308                         IOUtil.shutdownStream(fout);
309                     }
310                 }
311             }
312
313             String JavaDoc[] headers = msg.getHeader("Newsgroups");
314             Properties JavaDoc prop = new Properties JavaDoc();
315             if (headers != null) {
316                 for ( int i = 0 ; i < headers.length ; i++ ) {
317                     StringTokenizer JavaDoc tokenizer = new StringTokenizer JavaDoc(headers[i],",");
318                     while ( tokenizer.hasMoreTokens() ) {
319                         String JavaDoc groupName = tokenizer.nextToken().trim();
320                         getLogger().debug("Copying message to group: "+groupName);
321                         NNTPGroup group = repo.getGroup(groupName);
322                         if ( group == null ) {
323                             getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");
324                             continue;
325                         }
326
327                         FileInputStream JavaDoc newsStream = new FileInputStream JavaDoc(spoolFile);
328                         try {
329                             NNTPArticle article = group.addArticle(newsStream);
330                             prop.setProperty(group.getName(),article.getArticleNumber() + "");
331                         } finally {
332                             IOUtil.shutdownStream(newsStream);
333                         }
334                     }
335                 }
336             }
337             articleIDRepo.addArticle(articleID,prop);
338             boolean delSuccess = spoolFile.delete();
339             if ( delSuccess == false ) {
340                 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());
341             }
342         }
343     } // class SpoolerRunnable
344
}
345
Popular Tags