1 17 18 package org.apache.james.mailrepository; 19 20 import org.apache.avalon.framework.configuration.Configuration; 21 import org.apache.avalon.framework.configuration.ConfigurationException; 22 23 import org.apache.james.services.SpoolRepository; 24 import org.apache.james.core.MailImpl; 25 import org.apache.mailet.Mail; 26 27 import java.sql.Connection ; 28 import java.sql.PreparedStatement ; 29 import java.sql.ResultSet ; 30 import java.sql.SQLException ; 31 import java.sql.Statement ; 32 import java.util.LinkedList ; 33 34 89 public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository { 90 91 94 private static int WAIT_LIMIT = 60000; 95 98 private static int LOAD_TIME_MININUM = 1000; 99 102 private LinkedList pendingMessages = new LinkedList (); 103 106 private long pendingMessagesLoadTime = 0; 107 110 private int maxPendingMessages = 0; 111 112 115 public void configure(Configuration conf) throws ConfigurationException { 116 super.configure(conf); 117 maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000); 118 } 119 120 123 public Mail accept() throws InterruptedException { 124 return accept(new SpoolRepository.AcceptFilter () { 125 public boolean accept (String _, String __, long ___, String ____) { 126 return true; 127 } 128 129 public long getWaitTime () { 130 return 0; 131 } 132 }); 133 } 134 135 140 public synchronized Mail accept(final long delay) throws InterruptedException { 141 return accept (new SpoolRepository.AcceptFilter () { 142 long sleepUntil = 0; 143 144 public boolean accept (String key, String state, long lastUpdated, String errorMessage) { 145 if (Mail.ERROR.equals(state)) { 146 long processingTime = delay + lastUpdated; 148 if (processingTime < System.currentTimeMillis()) { 149 return true; 151 } else { 152 if (sleepUntil == 0 || processingTime < sleepUntil) { 155 sleepUntil = processingTime; 156 } 157 return false; 158 } 159 } else { 160 return true; 161 } 162 } 163 164 165 public long getWaitTime () { 166 if (sleepUntil == 0) { 167 sleepUntil = System.currentTimeMillis(); 168 } 169 long waitTime = sleepUntil - System.currentTimeMillis(); 170 sleepUntil = 0; 171 return waitTime <= 0 ? 1 : waitTime; 172 } 173 174 }); 175 } 176 177 187 public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException { 188 while (!Thread.currentThread().isInterrupted()) { 189 PendingMessage next = null; 192 while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) { 193 195 boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage); 196 197 if (shouldProcess && lock(next.key)) { 198 try { 199 MailImpl mail = retrieve(next.key); 200 if (mail == null) { 204 unlock(next.key); 205 continue; 206 } 207 return mail; 208 } catch (javax.mail.MessagingException e) { 209 unlock(next.key); 210 getLogger().error("Exception during retrieve -- skipping item " + next.key, e); 211 } 212 } 213 } 214 long wait_time = filter.getWaitTime(); 216 if (wait_time <= 0) { 217 wait_time = WAIT_LIMIT; 218 } 219 try { 220 synchronized (this) { 221 wait (wait_time); 222 } 223 } catch (InterruptedException ex) { 224 throw ex; 225 } 226 } 227 throw new InterruptedException (); 228 } 229 230 236 public void store(MailImpl mc) throws javax.mail.MessagingException { 237 pendingMessagesLoadTime = 0; 238 super.store(mc); 239 } 240 241 246 private PendingMessage getNextPendingMessage() { 247 synchronized (pendingMessages) { 249 if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) { 250 pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis(); 251 loadPendingMessages(); 252 } 253 254 if (pendingMessages.size() == 0) { 255 return null; 256 } else { 257 return (PendingMessage)pendingMessages.removeFirst(); 259 } 260 } 261 } 262 263 266 private void loadPendingMessages() { 267 synchronized (pendingMessages) { 270 pendingMessages.clear(); 271 272 Connection conn = null; 273 PreparedStatement listMessages = null; 274 ResultSet rsListMessages = null; 275 try { 276 conn = datasource.getConnection(); 277 listMessages = 278 conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true)); 279 listMessages.setString(1, repositoryName); 280 listMessages.setMaxRows(maxPendingMessages); 281 rsListMessages = listMessages.executeQuery(); 282 while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) { 287 String key = rsListMessages.getString(1); 288 String state = rsListMessages.getString(2); 289 long lastUpdated = rsListMessages.getTimestamp(3).getTime(); 290 String errorMessage = rsListMessages.getString(4); 291 pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage)); 292 } 293 } catch (SQLException sqle) { 294 getLogger().error("Error retrieving pending messages", sqle); 296 pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis(); 297 } finally { 298 theJDBCUtil.closeJDBCResultSet(rsListMessages); 299 theJDBCUtil.closeJDBCStatement(listMessages); 300 theJDBCUtil.closeJDBCConnection(conn); 301 } 302 } 303 } 304 305 308 class PendingMessage { 309 protected String key; 310 protected String state; 311 protected long lastUpdated; 312 protected String errorMessage; 313 314 public PendingMessage(String key, String state, long lastUpdated, String errorMessage) { 315 this.key = key; 316 this.state = state; 317 this.lastUpdated = lastUpdated; 318 this.errorMessage = errorMessage; 319 } 320 } 321 } 322 | Popular Tags |