1 17 18 package org.apache.james.nntpserver.repository; 19 20 import org.apache.avalon.excalibur.io.IOUtil; 21 import org.apache.avalon.framework.activity.Initializable; 22 import org.apache.avalon.framework.configuration.Configurable; 23 import org.apache.avalon.framework.configuration.Configuration; 24 import org.apache.avalon.framework.configuration.ConfigurationException; 25 import org.apache.avalon.framework.context.Context; 26 import org.apache.avalon.framework.context.ContextException; 27 import org.apache.avalon.framework.context.Contextualizable; 28 import org.apache.avalon.framework.logger.AbstractLogEnabled; 29 import org.apache.avalon.framework.logger.LogEnabled; 30 import org.apache.james.context.AvalonContextUtilities; 31 import org.apache.james.util.Lock; 32 33 import javax.mail.internet.MimeMessage ; 34 import java.io.BufferedReader ; 35 import java.io.File ; 36 import java.io.FileInputStream ; 37 import java.io.FileOutputStream ; 38 import java.io.InputStreamReader ; 39 import java.io.IOException ; 40 import java.util.Properties ; 41 import java.util.StringTokenizer ; 42 43 48 class NNTPSpooler extends AbstractLogEnabled 49 implements Contextualizable, Configurable, Initializable { 50 51 54 private Context context; 55 56 59 private SpoolerRunnable[] worker; 60 61 64 private File spoolPath; 65 66 69 private String spoolPathString; 70 71 74 private int threadIdleTime = 0; 75 76 79 public void contextualize(final Context context) 80 throws ContextException { 81 this.context = context; 82 } 83 84 87 public void configure( Configuration configuration ) throws ConfigurationException { 88 int threadCount = configuration.getChild("threadCount").getValueAsInteger(1); 89 threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000); 90 spoolPathString = configuration.getChild("spoolPath").getValue(); 91 worker = new SpoolerRunnable[threadCount]; 92 } 93 94 97 public void initialize() throws Exception { 98 100 try { 101 spoolPath = AvalonContextUtilities.getFile(context, spoolPathString); 102 if ( spoolPath.exists() == false ) { 103 spoolPath.mkdirs(); 104 } else if (!(spoolPath.isDirectory())) { 105 StringBuffer errorBuffer = 106 new StringBuffer (128) 107 .append("Spool directory is improperly configured. The specified path ") 108 .append(spoolPathString) 109 .append(" is not a directory."); 110 throw new ConfigurationException(errorBuffer.toString()); 111 } 112 } catch (Exception e) { 113 getLogger().fatalError(e.getMessage(), e); 114 throw e; 115 } 116 117 for ( int i = 0 ; i < worker.length ; i++ ) { 118 worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath); 119 if ( worker[i] instanceof LogEnabled ) { 120 ((LogEnabled)worker[i]).enableLogging(getLogger()); 121 } 122 } 123 124 for ( int i = 0 ; i < worker.length ; i++ ) { 126 new Thread (worker[i],"NNTPSpool-"+i).start(); 127 } 128 } 129 130 135 void setRepository(NNTPRepository repo) { 136 for ( int i = 0 ; i < worker.length ; i++ ) { 137 worker[i].setRepository(repo); 138 } 139 } 140 141 146 void setArticleIDRepository(ArticleIDRepository articleIDRepo) { 147 for ( int i = 0 ; i < worker.length ; i++ ) { 148 worker[i].setArticleIDRepository(articleIDRepo); 149 } 150 } 151 152 158 File getSpoolPath() { 159 return spoolPath; 160 } 161 162 166 static class SpoolerRunnable extends AbstractLogEnabled implements Runnable { 167 168 private static final Lock lock = new Lock(); 169 170 173 private final File spoolPath; 174 175 178 private final int threadIdleTime; 179 180 183 private ArticleIDRepository articleIDRepo; 184 185 188 private NNTPRepository repo; 189 190 SpoolerRunnable(int threadIdleTime,File spoolPath) { 191 this.threadIdleTime = threadIdleTime; 192 this.spoolPath = spoolPath; 193 } 194 195 200 void setArticleIDRepository(ArticleIDRepository articleIDRepo) { 201 this.articleIDRepo = articleIDRepo; 202 } 203 204 209 void setRepository(NNTPRepository repo) { 210 this.repo = repo; 211 } 212 213 217 public void run() { 218 getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread."); 219 try { 220 while ( Thread.currentThread().interrupted() == false ) { 221 String [] list = spoolPath.list(); 222 if (list.length > 0) getLogger().debug("Files to process: "+list.length); 223 for ( int i = 0 ; i < list.length ; i++ ) { 224 if ( lock.lock(list[i]) ) { 225 File f = new File (spoolPath,list[i]).getAbsoluteFile(); 226 getLogger().debug("Processing file: "+f.getAbsolutePath()); 227 try { 228 process(f); 229 } catch(Throwable ex) { 230 getLogger().debug("Exception occured while processing file: "+ 231 f.getAbsolutePath(),ex); 232 } finally { 233 lock.unlock(list[i]); 234 } 235 } 236 list[i] = null; } 238 list = null; try { 241 Thread.currentThread().sleep(threadIdleTime); 242 } catch(InterruptedException ex) { 243 } 245 } 246 } finally { 247 Thread.currentThread().interrupted(); 248 } 249 } 250 251 256 private void process(File spoolFile) throws Exception { 257 StringBuffer logBuffer = 258 new StringBuffer (160) 259 .append("process: ") 260 .append(spoolFile.getAbsolutePath()) 261 .append(",") 262 .append(spoolFile.getCanonicalPath()); 263 getLogger().debug(logBuffer.toString()); 264 final MimeMessage msg; 265 String articleID; 266 { FileInputStream fin = new FileInputStream (spoolFile); 269 try { 270 msg = new MimeMessage (null,fin); 271 } finally { 272 IOUtil.shutdownStream(fin); 273 } 274 275 String lineCount = null; 276 String [] lineCountHeader = msg.getHeader("Lines"); 277 if (lineCountHeader == null || lineCountHeader.length == 0) { 278 BufferedReader rdr = new BufferedReader (new InputStreamReader (msg.getDataHandler().getInputStream())); 279 int lines = 0; 280 while (rdr.readLine() != null) { 281 lines++; 282 } 283 284 lineCount = Integer.toString(lines); 285 rdr.close(); 286 287 msg.setHeader("Lines", lineCount); 288 } 289 290 String [] idheader = msg.getHeader("Message-Id"); 292 articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null); 293 if ((articleID != null) && ( articleIDRepo.isExists(articleID))) { 294 getLogger().debug("Message already exists: "+articleID); 295 if (spoolFile.delete() == false) 296 getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath()); 297 return; 298 } 299 if ( articleID == null || lineCount != null) { 300 if (articleID == null) { 301 articleID = articleIDRepo.generateArticleID(); 302 msg.setHeader("Message-Id", articleID); 303 } 304 FileOutputStream fout = new FileOutputStream (spoolFile); 305 try { 306 msg.writeTo(fout); 307 } finally { 308 IOUtil.shutdownStream(fout); 309 } 310 } 311 } 312 313 String [] headers = msg.getHeader("Newsgroups"); 314 Properties prop = new Properties (); 315 if (headers != null) { 316 for ( int i = 0 ; i < headers.length ; i++ ) { 317 StringTokenizer tokenizer = new StringTokenizer (headers[i],","); 318 while ( tokenizer.hasMoreTokens() ) { 319 String groupName = tokenizer.nextToken().trim(); 320 getLogger().debug("Copying message to group: "+groupName); 321 NNTPGroup group = repo.getGroup(groupName); 322 if ( group == null ) { 323 getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found."); 324 continue; 325 } 326 327 FileInputStream newsStream = new FileInputStream (spoolFile); 328 try { 329 NNTPArticle article = group.addArticle(newsStream); 330 prop.setProperty(group.getName(),article.getArticleNumber() + ""); 331 } finally { 332 IOUtil.shutdownStream(newsStream); 333 } 334 } 335 } 336 } 337 articleIDRepo.addArticle(articleID,prop); 338 boolean delSuccess = spoolFile.delete(); 339 if ( delSuccess == false ) { 340 getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath()); 341 } 342 } 343 } } 345 | Popular Tags |