1 19 20 21 package org.apache.james.test.mock.james; 22 23 import org.apache.avalon.framework.activity.Disposable; 24 import org.apache.avalon.framework.container.ContainerUtil; 25 import org.apache.james.core.MailImpl; 26 import org.apache.james.services.SpoolRepository; 27 import org.apache.james.test.mock.avalon.MockLogger; 28 import org.apache.james.util.Lock; 29 import org.apache.mailet.Mail; 30 31 import javax.mail.MessagingException ; 32 33 import java.util.ArrayList ; 34 import java.util.Collection ; 35 import java.util.ConcurrentModificationException ; 36 import java.util.Hashtable ; 37 import java.util.Iterator ; 38 39 50 public class InMemorySpoolRepository 51 implements SpoolRepository, Disposable { 52 53 56 protected final static boolean DEEP_DEBUG = true; 57 private Lock lock; 58 private MockLogger logger; 59 private Hashtable spool; 60 61 private MockLogger getLogger() { 62 if (logger == null) { 63 logger = new MockLogger(); 64 } 65 return logger; 66 } 67 68 75 public boolean unlock(String key) { 76 if (lock.unlock(key)) { 77 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 78 StringBuffer debugBuffer = 79 new StringBuffer (256) 80 .append("Unlocked ") 81 .append(key) 82 .append(" for ") 83 .append(Thread.currentThread().getName()) 84 .append(" @ ") 85 .append(new java.util.Date (System.currentTimeMillis())); 86 getLogger().debug(debugBuffer.toString()); 87 } 88 return true; 89 } else { 90 return false; 91 } 92 } 93 94 101 public boolean lock(String key) { 102 if (lock.lock(key)) { 103 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 104 StringBuffer debugBuffer = 105 new StringBuffer (256) 106 .append("Locked ") 107 .append(key) 108 .append(" for ") 109 .append(Thread.currentThread().getName()) 110 .append(" @ ") 111 .append(new java.util.Date (System.currentTimeMillis())); 112 getLogger().debug(debugBuffer.toString()); 113 } 114 return true; 118 } else { 119 return false; 120 } 121 } 122 123 129 public void store(Mail mc) throws MessagingException { 130 try { 131 String key = mc.getName(); 132 boolean wasLocked = true; 134 synchronized (this) { 135 wasLocked = lock.isLocked(key); 136 137 if (!wasLocked) { 138 lock(key); 140 } 141 } 142 try { 143 MailImpl m = new MailImpl(mc,mc.getName()); 144 m.setState(mc.getState()); 145 spool.put(mc.getName(),m); 146 } finally { 147 if (!wasLocked) { 148 unlock(key); 150 synchronized (this) { 151 notify(); 152 } 153 } 154 } 155 156 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 157 StringBuffer logBuffer = 158 new StringBuffer (64) 159 .append("Mail ") 160 .append(key) 161 .append(" stored."); 162 getLogger().debug(logBuffer.toString()); 163 } 164 165 } catch (Exception e) { 166 getLogger().error("Exception storing mail: " + e,e); 167 throw new MessagingException ("Exception caught while storing Message Container: ",e); 168 } 169 } 170 171 178 public Mail retrieve(String key) throws MessagingException { 179 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 180 getLogger().debug("Retrieving mail: " + key); 181 } 182 try { 183 Mail mc = null; 184 try { 185 mc = new MailImpl((Mail) spool.get(key),key); 186 mc.setState(((Mail) spool.get(key)).getState()); 187 } 188 catch (RuntimeException re){ 189 StringBuffer exceptionBuffer = new StringBuffer (128); 190 if(re.getCause() instanceof Error ){ 191 exceptionBuffer.append("Error when retrieving mail, not deleting: ") 192 .append(re.toString()); 193 }else{ 194 exceptionBuffer.append("Exception retrieving mail: ") 195 .append(re.toString()) 196 .append(", so we're deleting it."); 197 remove(key); 198 } 199 getLogger().warn(exceptionBuffer.toString()); 200 return null; 201 } 202 return mc; 203 } catch (Exception me) { 204 getLogger().error("Exception retrieving mail: " + me); 205 throw new MessagingException ("Exception while retrieving mail: " + me.getMessage()); 206 } 207 } 208 209 214 public void remove(Mail mail) throws MessagingException { 215 remove(mail.getName()); 216 } 217 218 219 225 public void remove(Collection mails) throws MessagingException { 226 Iterator delList = mails.iterator(); 227 while (delList.hasNext()) { 228 remove((Mail)delList.next()); 229 } 230 } 231 232 237 public void remove(String key) throws MessagingException { 238 if (lock(key)) { 239 try { 240 if (spool != null) { 241 Object o = spool.remove(key); 242 ContainerUtil.dispose(o); 243 } 244 } finally { 245 unlock(key); 246 } 247 } else { 248 StringBuffer exceptionBuffer = 249 new StringBuffer (64) 250 .append("Cannot lock ") 251 .append(key) 252 .append(" to remove it"); 253 throw new MessagingException (exceptionBuffer.toString()); 254 } 255 } 256 257 263 public Iterator list() { 264 final ArrayList clone; 267 synchronized(spool) { 268 clone = new ArrayList (spool.keySet()); 269 } 270 return clone.iterator(); 271 } 272 273 274 283 public synchronized Mail accept() throws InterruptedException { 284 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 285 getLogger().debug("Method accept() called"); 286 } 287 return accept(new SpoolRepository.AcceptFilter () { 288 public boolean accept (String _, String __, long ___, String ____) { 289 return true; 290 } 291 292 public long getWaitTime () { 293 return 0; 294 } 295 }); 296 } 297 298 309 public synchronized Mail accept(final long delay) throws InterruptedException 310 { 311 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 312 getLogger().debug("Method accept(delay) called"); 313 } 314 return accept(new SpoolRepository.AcceptFilter () { 315 long youngest = 0; 316 317 public boolean accept (String key, String state, long lastUpdated, String errorMessage) { 318 if (state.equals(Mail.ERROR)) { 319 long timeToProcess = delay + lastUpdated; 321 322 if (System.currentTimeMillis() > timeToProcess) { 323 return true; 325 } else { 326 if (youngest == 0 || youngest > timeToProcess) { 328 youngest = timeToProcess; 330 } 331 return false; 332 } 333 } else { 334 return true; 336 } 337 } 338 339 public long getWaitTime () { 340 if (youngest == 0) { 341 return 0; 342 } else { 343 long duration = youngest - System.currentTimeMillis(); 344 youngest = 0; return duration <= 0 ? 1 : duration; 346 } 347 } 348 }); 349 } 350 351 352 364 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException { 365 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 366 getLogger().debug("Method accept(Filter) called"); 367 } 368 while (!Thread.currentThread().isInterrupted()) try { 369 for (Iterator it = list(); it.hasNext(); ) { 370 String s = it.next().toString(); 371 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 372 StringBuffer logBuffer = 373 new StringBuffer (64) 374 .append("Found item ") 375 .append(s) 376 .append(" in spool."); 377 getLogger().debug(logBuffer.toString()); 378 } 379 if (lock(s)) { 380 if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { 381 getLogger().debug("accept(Filter) has locked: " + s); 382 } 383 try { 384 Mail mail = retrieve(s); 385 if (mail == null || !filter.accept (mail.getName(), 389 mail.getState(), 390 mail.getLastUpdated().getTime(), 391 mail.getErrorMessage())) { 392 unlock(s); 393 continue; 394 } 395 return mail; 396 } catch (javax.mail.MessagingException e) { 397 unlock(s); 398 getLogger().error("Exception during retrieve -- skipping item " + s, e); 399 } 400 } 401 } 402 403 wait (filter.getWaitTime()); 405 } catch (InterruptedException ex) { 406 throw ex; 407 } catch (ConcurrentModificationException cme) { 408 getLogger().error("CME in spooler - please report to http://james.apache.org", cme); 410 } 411 throw new InterruptedException (); 412 } 413 414 417 public InMemorySpoolRepository() { 418 spool = new Hashtable (); 419 lock = new Lock(); 420 } 421 422 public int size() { 423 return spool.size(); 424 } 425 426 public void clear() { 427 if (spool != null) { 428 Iterator i = list(); 429 while (i.hasNext()) { 430 String key = (String ) i.next(); 431 try { 432 remove(key); 433 } catch (MessagingException e) { 434 } 435 } 436 } 437 } 438 439 public void dispose() { 440 clear(); 441 } 442 443 public String toString() { 444 StringBuffer result = new StringBuffer (); 445 result.append(super.toString()); 446 Iterator i = list(); 447 while (i.hasNext()) { 448 result.append("\n\t"+i.next()); 449 } 450 return result.toString(); 451 } 452 453 } 454 | Popular Tags |