KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > james > mailrepository > JDBCSpoolRepository


1 /***********************************************************************
2  * Copyright (c) 1999-2004 The Apache Software Foundation. *
3  * All rights reserved. *
4  * ------------------------------------------------------------------- *
5  * Licensed under the Apache License, Version 2.0 (the "License"); you *
6  * may not use this file except in compliance with the License. You *
7  * may obtain a copy of the License at: *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
14  * implied. See the License for the specific language governing *
15  * permissions and limitations under the License. *
16  ***********************************************************************/

17
18 package org.apache.james.mailrepository;
19
20 import org.apache.avalon.framework.configuration.Configuration;
21 import org.apache.avalon.framework.configuration.ConfigurationException;
22
23 import org.apache.james.services.SpoolRepository;
24 import org.apache.james.core.MailImpl;
25 import org.apache.mailet.Mail;
26
27 import java.sql.Connection JavaDoc;
28 import java.sql.PreparedStatement JavaDoc;
29 import java.sql.ResultSet JavaDoc;
30 import java.sql.SQLException JavaDoc;
31 import java.sql.Statement JavaDoc;
32 import java.util.LinkedList JavaDoc;
33
34 /**
35  * Implementation of a SpoolRepository on a database.
36  *
37  * <p>Requires a configuration element in the .conf.xml file of the form:
38  * <br>&lt;repository destinationURL="town://path"
39  * <br> type="MAIL"
40  * <br> model="SYNCHRONOUS"/&gt;
41  * <br> &lt;driver&gt;sun.jdbc.odbc.JdbcOdbcDriver&lt;/conn&gt;
42  * <br> &lt;conn&gt;jdbc:odbc:LocalDB&lt;/conn&gt;
43  * <br> &lt;table&gt;Message&lt;/table&gt;
44  * <br>&lt;/repository&gt;
45  * <p>destinationURL specifies..(Serge??)
46  * <br>Type can be SPOOL or MAIL
47  * <br>Model is currently not used and may be dropped
48  * <br>conn is the location of the ...(Serge)
49  * <br>table is the name of the table in the Database to be used
50  *
51  * <p>Requires a logger called MailRepository.
52  *
53  * <p>Approach for spool manager:
54  *
55  * PendingMessage inner class
56  *
57  * accept() is called....
58  * checks whether needs to load PendingMessages()
59  * tries to get a message()
60  * if none, wait 60
61  *
62  * accept(long) is called
63  * checks whether needs to load PendingMessages
64  * tries to get a message(long)
65  * if none, wait accordingly
66  *
67  * sync checkswhetherneedstoloadPendingMessages()
68  * if pending messages has messages in immediate process, return immediately
69  * if run query in last WAIT_LIMIT time, return immediately
70  * query and build 2 vectors of Pending messages.
71  * Ones that need immediate processing
72  * Ones that are delayed. put them in time order
73  * return
74  *
75  * get_a_message()
76  * loop through immediate messages.
77  * - remove top message
78  * - try to lock. if successful, return. otherwise loop.
79  * if nothing, return null
80  *
81  * get_a_message(long)
82  * try get_a_message()
83  * check top message in pending. if ready, then remove, try to lock, return if lock.
84  * return null.
85  *
86  *
87  * @version 1.0.0, 24/04/1999
88  */

89 public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository {
90
91     /**
92      * How long a thread should sleep when there are no messages to process.
93      */

94     private static int WAIT_LIMIT = 60000;
95     /**
96      * How long we have to wait before reloading the list of pending messages
97      */

98     private static int LOAD_TIME_MININUM = 1000;
99     /**
100      * A queue in memory of messages that need processing
101      */

102     private LinkedList JavaDoc pendingMessages = new LinkedList JavaDoc();
103     /**
104      * When the queue was last read
105      */

106     private long pendingMessagesLoadTime = 0;
107     /**
108      * Maximum size of the pendingMessages queue
109      */

110     private int maxPendingMessages = 0;
111
112     /**
113      * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
114      */

115     public void configure(Configuration conf) throws ConfigurationException {
116         super.configure(conf);
117         maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
118     }
119
120     /**
121      * Return a message to process. This is a message in the spool that is not locked.
122      */

123     public Mail accept() throws InterruptedException JavaDoc {
124         return accept(new SpoolRepository.AcceptFilter () {
125             public boolean accept (String JavaDoc _, String JavaDoc __, long ___, String JavaDoc ____) {
126                 return true;
127             }
128
129             public long getWaitTime () {
130                 return 0;
131             }
132         });
133     }
134
135     /**
136      * Return a message that's ready to process. If a message is of type "error"
137      * then check the last updated time, and don't try it until the long 'delay' parameter
138      * milliseconds has passed.
139      */

140     public synchronized Mail accept(final long delay) throws InterruptedException JavaDoc {
141         return accept (new SpoolRepository.AcceptFilter () {
142             long sleepUntil = 0;
143                 
144                 public boolean accept (String JavaDoc key, String JavaDoc state, long lastUpdated, String JavaDoc errorMessage) {
145                     if (Mail.ERROR.equals(state)) {
146                     //if it's an error message, test the time
147
long processingTime = delay + lastUpdated;
148                     if (processingTime < System.currentTimeMillis()) {
149                         //It's time to process
150
return true;
151                     } else {
152                         //We don't process this, but we want to possibly reduce the amount of time
153
// we sleep so we wake when this message is ready.
154
if (sleepUntil == 0 || processingTime < sleepUntil) {
155                             sleepUntil = processingTime;
156                         }
157                             return false;
158                     }
159                 } else {
160                         return true;
161                     }
162                 }
163                 
164
165                 public long getWaitTime () {
166                     if (sleepUntil == 0) {
167                         sleepUntil = System.currentTimeMillis();
168                     }
169                     long waitTime = sleepUntil - System.currentTimeMillis();
170                     sleepUntil = 0;
171                     return waitTime <= 0 ? 1 : waitTime;
172                 }
173                 
174             });
175     }
176
177     /**
178      * Returns an arbitrarily selected mail deposited in this Repository for
179      * which the supplied filter's accept method returns true.
180      * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
181      * based on number of retries if the mail is ready for processing.
182      * If no message is ready the method will block until one is, the amount of time to block is
183      * determined by calling the filters getWaitTime method.
184      *
185      * @return the mail
186      */

187     public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException JavaDoc {
188         while (!Thread.currentThread().isInterrupted()) {
189             //Loop through until we are either out of pending messages or have a message
190
// that we can lock
191
PendingMessage next = null;
192             while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) {
193                 //Check whether this is time to expire
194

195                 boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
196                 
197                 if (shouldProcess && lock(next.key)) {
198                     try {
199                         MailImpl mail = retrieve(next.key);
200                         // Retrieve can return null if the mail is no longer on the spool
201
// (i.e. another thread has gotten to it first).
202
// In this case we simply continue to the next key
203
if (mail == null) {
204                             unlock(next.key);
205                             continue;
206                         }
207                         return mail;
208                     } catch (javax.mail.MessagingException JavaDoc e) {
209                         unlock(next.key);
210                         getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
211                     }
212                 }
213             }
214             //Nothing to do... sleep!
215
long wait_time = filter.getWaitTime();
216             if (wait_time <= 0) {
217                 wait_time = WAIT_LIMIT;
218             }
219             try {
220                 synchronized (this) {
221                     wait (wait_time);
222                 }
223             } catch (InterruptedException JavaDoc ex) {
224                 throw ex;
225             }
226         }
227         throw new InterruptedException JavaDoc();
228     }
229
230     /**
231      * Needs to override this method and reset the time to load to zero.
232      * This will force a reload of the pending messages queue once that
233      * is empty... a message that gets added will sit here until that queue
234      * time has passed and the list is then reloaded.
235      */

236     public void store(MailImpl mc) throws javax.mail.MessagingException JavaDoc {
237         pendingMessagesLoadTime = 0;
238         super.store(mc);
239     }
240
241     /**
242      * If not empty, gets the next pending message. Otherwise checks
243      * checks the last time pending messages was loaded and load if
244      * it's been more than 1 second (should be configurable).
245      */

246     private PendingMessage getNextPendingMessage() {
247         //System.err.println("Trying to get next message in " + repositoryName);
248
synchronized (pendingMessages) {
249             if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
250                 pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis();
251                 loadPendingMessages();
252             }
253
254             if (pendingMessages.size() == 0) {
255                 return null;
256             } else {
257                 //System.err.println("Returning a pending message in " + repositoryName);
258
return (PendingMessage)pendingMessages.removeFirst();
259             }
260         }
261     }
262
263     /**
264      * Retrieves the pending messages that are in the database
265      */

266     private void loadPendingMessages() {
267         //Loads a vector with PendingMessage objects
268
//System.err.println("loading pending messages in " + repositoryName);
269
synchronized (pendingMessages) {
270             pendingMessages.clear();
271
272             Connection JavaDoc conn = null;
273             PreparedStatement JavaDoc listMessages = null;
274             ResultSet JavaDoc rsListMessages = null;
275             try {
276                 conn = datasource.getConnection();
277                 listMessages =
278                     conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
279                 listMessages.setString(1, repositoryName);
280                 listMessages.setMaxRows(maxPendingMessages);
281                 rsListMessages = listMessages.executeQuery();
282                 // Continue to have it loop through the list of messages until we hit
283
// a possible message, or we retrieve maxPendingMessages messages.
284
// This maxPendingMessages cap is to avoid loading thousands or
285
// hundreds of thousands of messages when the spool is enourmous.
286
while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
287                     String JavaDoc key = rsListMessages.getString(1);
288                     String JavaDoc state = rsListMessages.getString(2);
289                     long lastUpdated = rsListMessages.getTimestamp(3).getTime();
290                     String JavaDoc errorMessage = rsListMessages.getString(4);
291                     pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
292                 }
293             } catch (SQLException JavaDoc sqle) {
294                 //Log it and avoid reloading for a bit
295
getLogger().error("Error retrieving pending messages", sqle);
296                 pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
297             } finally {
298                 theJDBCUtil.closeJDBCResultSet(rsListMessages);
299                 theJDBCUtil.closeJDBCStatement(listMessages);
300                 theJDBCUtil.closeJDBCConnection(conn);
301             }
302         }
303     }
304
305     /**
306      * Simple class to hold basic information about a message in the spool
307      */

308     class PendingMessage {
309         protected String JavaDoc key;
310         protected String JavaDoc state;
311         protected long lastUpdated;
312         protected String JavaDoc errorMessage;
313
314         public PendingMessage(String JavaDoc key, String JavaDoc state, long lastUpdated, String JavaDoc errorMessage) {
315             this.key = key;
316             this.state = state;
317             this.lastUpdated = lastUpdated;
318             this.errorMessage = errorMessage;
319         }
320     }
321 }
322
Popular Tags