KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > jcorporate > expresso > services > asyncprocess > DefaultAsyncProcessor


1 /* ====================================================================
2  * The Jcorporate Apache Style Software License, Version 1.2 05-07-2002
3  *
4  * Copyright (c) 1995-2002 Jcorporate Ltd. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  *
18  * 3. The end-user documentation included with the redistribution,
19  * if any, must include the following acknowledgment:
20  * "This product includes software developed by Jcorporate Ltd.
21  * (http://www.jcorporate.com/)."
22  * Alternately, this acknowledgment may appear in the software itself,
23  * if and wherever such third-party acknowledgments normally appear.
24  *
25  * 4. "Jcorporate" and product names such as "Expresso" must
26  * not be used to endorse or promote products derived from this
27  * software without prior written permission. For written permission,
28  * please contact info@jcorporate.com.
29  *
30  * 5. Products derived from this software may not be called "Expresso",
31  * or other Jcorporate product names; nor may "Expresso" or other
32  * Jcorporate product names appear in their name, without prior
33  * written permission of Jcorporate Ltd.
34  *
35  * 6. No product derived from this software may compete in the same
36  * market space, i.e. framework, without prior written permission
37  * of Jcorporate Ltd. For written permission, please contact
38  * partners@jcorporate.com.
39  *
40  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
41  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
42  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
43  * DISCLAIMED. IN NO EVENT SHALL JCORPORATE LTD OR ITS CONTRIBUTORS
44  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
46  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
47  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
48  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
49  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
50  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
51  * SUCH DAMAGE.
52  * ====================================================================
53  *
54  * This software consists of voluntary contributions made by many
55  * individuals on behalf of the Jcorporate Ltd. Contributions back
56  * to the project(s) are encouraged when you make modifications.
57  * Please send them to support@jcorporate.com. For more information
58  * on Jcorporate Ltd. and its products, please see
59  * <http://www.jcorporate.com/>.
60  *
61  * Portions of this software are based upon other open source
62  * products and are subject to their respective licenses.
63  */

64
65 package com.jcorporate.expresso.services.asyncprocess;
66
67 import com.jcorporate.expresso.core.db.DBException;
68 import com.jcorporate.expresso.core.misc.StringUtil;
69 import com.jcorporate.expresso.core.registry.MutableRequestRegistry;
70 import com.jcorporate.expresso.core.registry.RequestRegistry;
71 import com.jcorporate.expresso.core.security.User;
72 import com.jcorporate.expresso.services.dbobj.Setup;
73 import org.apache.log4j.Logger;
74
75 import java.util.HashMap JavaDoc;
76 import java.util.LinkedList JavaDoc;
77 import java.util.Map JavaDoc;
78
79 /**
80  * This is a default implementation of the AsyncProcessor. It has specific
81  * claim timeout, specific threads and queue size, and a particular order of
82  * dealing with things. A different kind of implementation might, for example,
83  * dispatch the requests to another machine for processing and return the result.
84  * <p/>
85  * This asynchronous processor relies on the set up tables for the following parameters.
86  * parameters are missing from the set up table then the default values will be used instead.
87  * These parameters are:
88  * <ul>
89  * <li><b>AsyncClaimTimeout</b>: when the job is completed there is always the chance
90  * that it will never be claimed by the client a for example if they close their
91  * browser window. AsyncClaimTimeout specifies in milliseconds how long to wait
92  * before discarding
93  * a completed job. The default value is 30 minutes.</li>
94  * <li><b>AsyncNumThreads</b>: This value specifies how many threads
95  * the asynchronous processor has available to process waiting processes.
96  * the default value for this is 10 threads.</li>
97  * <li><b>AsyncQueueSize</b>: When all threads are occupied with running processes
98  * this parameter specifies how he processes to hold in a waiting queue
99  * before throwing a queue full exception. The default value for this
100  * is 20.</li>
101  * </ul>
102  * </p>
103  *
104  * @author Michael Rimov
105  * @version $Revision: 1.12 $ on $Date: 2004/11/17 20:48:17 $
106  */

107 public class DefaultAsyncProcessor
108         implements AsyncProcessor {
109
110     public static final String JavaDoc DEFAULT_PROCESSSOR_PROPERTIES_FILE =
111             "DefaultProcessor.properties";
112     private int nextIssuedTicket = 0;
113
114     private LinkedList JavaDoc waitQueue = new LinkedList JavaDoc();
115
116     private Map JavaDoc allProcesses = new HashMap JavaDoc();
117
118     private ProcessThread processThreads[];
119
120     private ThreadGroup JavaDoc processThreadGroup = new ThreadGroup JavaDoc("AsyncProcess Threads");
121
122     long claimTimeout = 30000;
123
124     int numThreads = 10;
125
126     int queueSize = 20;
127
128     private static final Logger log = Logger.getLogger(DefaultAsyncProcessor.class);
129
130     public DefaultAsyncProcessor() {
131         try {
132 // props.load(this.getClass()
133
// .getResourceAsStream("DefaultProcessor.properties"))
134
claimTimeout = Long.parseLong((StringUtil.notNull(Setup.getValue("default", "AsyncClaimTimeout")).length() != 0)
135                     ?
136                     Setup.getValue("default", "AsyncClaimTimeout") :
137                     "30000");
138 // claimTimeout = Long.parseLong(
139
// props.getProperty("claimTimeout","30000").trim());
140

141             numThreads = Integer.parseInt((StringUtil.notNull(Setup.getValue("default", "AsyncNumThreads")).length() != 0)
142                     ?
143                     Setup.getValue("default", "AsyncNumThreads") :
144                     "10");
145 // numThreads = Integer.parseInt(props.getProperty("numThreads","10").trim());
146

147             queueSize = Integer.parseInt((StringUtil.notNull(Setup.getValue("default", "AsyncQueueSize")).length() != 0)
148                     ?
149                     Setup.getValue("default", "AsyncQueueSize") :
150                     "20");
151
152 // queueSize = Integer.parseInt(props.getProperty("queueSize","20").trim());
153
} catch (DBException ex) {
154             log.error("Error loading properties", ex);
155         } catch (NumberFormatException JavaDoc ex) {
156             log.error("Error parsing setup values for Async processor. "
157                     + "Has this DB gone through initial setup? Using default "
158                     + "values instead.");
159         }
160
161         processThreads = new ProcessThread[numThreads];
162         for (int i = 0; i < numThreads; i++) {
163             processThreads[i] = new ProcessThread(this, processThreadGroup,
164                     "Processor" + i);
165         }
166
167         if (log.isInfoEnabled()) {
168             log.info("Starting processing threads");
169         }
170
171         for (int i = 0; i < numThreads; i++) {
172             processThreads[i].start();
173         }
174     }
175
176     /**
177      * Destroys all threads;
178      */

179     public synchronized void destroy() {
180         for (int i = 0; i < numThreads; i++) {
181             Thread JavaDoc aThread = processThreads[i];
182             if (aThread != null) {
183                 aThread.interrupt();
184             }
185         }
186
187         for (int i = 0; i < numThreads; i++) {
188             Thread JavaDoc aThread = processThreads[i];
189             if (aThread != null) {
190                 try {
191                     if (aThread != null) {
192                         aThread.join(1000);
193                     }
194                 } catch (InterruptedException JavaDoc ex) {
195                     log.info("Interrupted while waiting for process thread", ex);
196                 }
197
198                 if (aThread.isAlive()) {
199                     log.warn("After waiting a second, the process thread: " +
200                             aThread.getName()
201                             + " has still not exited");
202                 }
203             }
204         }
205
206         processThreads = null;
207         if (processThreadGroup != null) {
208             processThreadGroup.destroy();
209         }
210     }
211
212     /**
213      * Add an asynchronous process ot the queue
214      *
215      * @param newProcess the AsyncProcess implementation to run
216      * @return AsyncTicket object for use in reeming the process' status.
217      * @throws QueueFullException if there are too many jobs waiting.
218      */

219     public synchronized AsyncTicket addToQueue(AsyncProcess newProcess)
220             throws QueueFullException {
221         if (processThreads == null) {
222             throw new IllegalStateException JavaDoc("Async Processor has already shut down");
223         }
224
225         nextIssuedTicket++;
226         DefaultTicket ticket = new DefaultTicket(nextIssuedTicket);
227         ProcessWrapper wrapper = new ProcessWrapper(newProcess, ticket);
228         wrapper.getResult().setStatusCode(AsyncProcessResult.STATUS_PENDING);
229         DefaultTicket theTicket = new DefaultTicket(nextIssuedTicket);
230
231         synchronized (allProcesses) {
232             allProcesses.put(theTicket, wrapper);
233         }
234
235         synchronized (waitQueue) {
236             if (waitQueue.size() > queueSize) {
237                 throw new QueueFullException("Queue already has " +
238                         waitQueue.size()
239                         +
240                         " processes waiting. Cannot continue");
241             }
242
243             waitQueue.addLast(wrapper);
244             waitQueue.notify();
245         }
246         return theTicket;
247     }
248
249     /**
250      * Similar to addToQueue(AsyncProcess), but it tries to wait for the specified
251      * period of time before returning to see if the process completes during
252      * that time.
253      *
254      * @param newProcess The AsyncProcess to add to the queue
255      * @param waitTimeout the number of milliseconds to wait to see if the process
256      * completes in that time.
257      * @return AsyncTicket instance.
258      */

259
260     public AsyncTicket addToQueue(AsyncProcess newProcess, long waitTimeout)
261             throws QueueFullException {
262
263         AsyncTicket ticket = addToQueue(newProcess);
264         synchronized (newProcess) {
265             try {
266                 newProcess.wait(waitTimeout);
267             } catch (InterruptedException JavaDoc ex) {
268                 log.info("Interrupted while waiting for process", ex);
269                 return ticket;
270             }
271         }
272
273         synchronized (allProcesses) {
274             ProcessWrapper wrapper = this.getProcessWrapper(ticket);
275             if (wrapper == null) {
276                 return null;
277             }
278
279             if (wrapper.getResult().getStatusCode() ==
280                     AsyncProcessResult.STATUS_COMPLETE) {
281                 this.getProcessResult(ticket);
282                 return new DefaultTicket(-1);
283             }
284         }
285
286         return ticket;
287     }
288
289     /**
290      * Private wrapper.... MAKE SURE you synchronize all Processes before
291      * calling.
292      *
293      * @param ticketId the Async ticket to redeem
294      * @return ProcessWrapper or null if it doesn't exist.
295      */

296     private ProcessWrapper getProcessWrapper(AsyncTicket ticketId) {
297         ProcessWrapper wrapper = (ProcessWrapper) allProcesses.get(ticketId);
298         if (wrapper == null) {
299             return null;
300         }
301
302         return wrapper;
303     }
304
305     /**
306      * Retrieve the queue. This function purposefully has package access.
307      *
308      * @return java.util.LinkedList
309      */

310     LinkedList JavaDoc getQueue() {
311         return waitQueue;
312     }
313
314     /**
315      * Clean out any processes that have been completed over the timeout interval
316      * and nobody has claimed it.
317      */

318 // private synchronized void cleanProcesses() {
319
// long curTime = System.currentTimeMillis();
320
// synchronized(allProcesses) {
321
// for (Iterator i = allProcesses.values().iterator(); i.hasNext();) {
322
// ProcessWrapper oneProcess = (ProcessWrapper)i.next();
323
// if (oneProcess.getCompletedTime() + claimTimeout > curTime) {
324
// allProcesses.remove(oneProcess.getObjectId());
325
// }
326
// }
327
// }
328
// }
329

330
331     /**
332      * Wrapper object for the process queue
333      *
334      * @author Michael Rimov
335      * @version $Revision: 1.12 $ on $Date: 2004/11/17 20:48:17 $
336      */

337     class ProcessWrapper {
338         private AsyncProcess wrappedObject;
339
340         DefaultTicket objectId;
341
342         DefaultProcessResult result;
343
344         long queueTime;
345
346         long startTime;
347
348         long completedTime;
349
350         private String JavaDoc defaultDataContext;
351
352         private User defaultUser;
353
354         public ProcessWrapper(AsyncProcess objectToWrap,
355                               DefaultTicket newObjectId) {
356             queueTime = System.currentTimeMillis();
357             wrappedObject = objectToWrap;
358             objectId = newObjectId;
359             result = new DefaultProcessResult();
360             result.setOriginalProcess(objectToWrap);
361
362             //This object is created in one thread and run in another
363
//So we propagate the thread capabilities.
364
defaultDataContext = RequestRegistry.getDataContext();
365             defaultUser = RequestRegistry.getUser();
366         }
367
368         public AsyncProcess getWrappedObject() {
369             return wrappedObject;
370         }
371
372         public DefaultProcessResult getResult() {
373             return result;
374         }
375
376
377         /**
378          * Do the actual processing. Any exceptions are caught and saved
379          * in the process object itself and status fault is set.
380          */

381         public void process() {
382             //Set the context for this thread to be that of the
383
//spawning thread.
384
new MutableRequestRegistry(defaultDataContext, defaultUser);
385
386             startTime = System.currentTimeMillis();
387             this.getResult().setStatusCode(AsyncProcessResult.STATUS_RUNNING);
388             try {
389                 wrappedObject.process();
390             } catch (Throwable JavaDoc ex) {
391                 this.getResult().setException(ex);
392                 this.getResult().setStatusCode(AsyncProcessResult.STATUS_FAULT);
393             }
394             completedTime = System.currentTimeMillis();
395             this.getResult().setStatusCode(AsyncProcessResult.STATUS_COMPLETE);
396
397             //
398
//If somebody is waiting on the object, notify them so that
399
//they can continue.
400
//
401
synchronized (wrappedObject) {
402                 wrappedObject.notify();
403             }
404
405             //
406
//Most jobs tend to require quite a bit of memory. Schedule a GC
407
//as soon as the system can to clear the cruft left behind by
408
//the job.
409
//
410
System.gc();
411         }
412
413         public long getQueueTime() {
414             return queueTime;
415         }
416
417         public long getCompletedTime() {
418             return completedTime;
419         }
420
421         public DefaultTicket getObjectId() {
422             return objectId;
423         }
424
425     }
426
427
428     /**
429      * Retrieve the result of the process
430      *
431      * @param ticketId the ticket id of the process
432      * @return java.lang.Object
433      */

434     public AsyncProcessResult getProcessResult(AsyncTicket ticketId) {
435         synchronized (allProcesses) {
436             ProcessWrapper wrapper = getProcessWrapper(ticketId);
437
438             if (wrapper == null) {
439                 log.warn("Error getting process wrapper for ticket: " +
440                         ticketId.toString());
441                 return null;
442             }
443
444             if (wrapper.getResult().getStatusCode() ==
445                     AsyncProcessResult.STATUS_COMPLETE) {
446                 allProcesses.remove(ticketId);
447             }
448
449             return wrapper.getResult();
450         }
451     }
452
453
454     /**
455      * Retrieve the result of the process waiting up to a specified time for the
456      * process to complete
457      *
458      * @param ticketId the ticket ID
459      * @param waitTimeout the time in ms to wait for the process to complete before
460      * returning.
461      * @return AsyncProcessResult
462      */

463     public AsyncProcessResult getProcessResult(AsyncTicket ticketId,
464                                                long waitTimeout) {
465         ProcessWrapper wrapper;
466
467         synchronized (allProcesses) {
468             wrapper = getProcessWrapper(ticketId);
469
470             if (wrapper == null) {
471                 log.warn("Didn't get a process warpper for ticket: " +
472                         ticketId.toString());
473                 return null;
474             }
475
476             if (wrapper.getResult().getStatusCode() ==
477                     AsyncProcessResult.STATUS_COMPLETE) {
478                 allProcesses.remove(ticketId);
479                 return wrapper.getResult();
480             }
481         }
482
483         AsyncProcess process = wrapper.getWrappedObject();
484         synchronized (process) {
485             try {
486                 process.wait(waitTimeout);
487             } catch (InterruptedException JavaDoc ex) {
488                 log.info("Interrupted while waiting for process", ex);
489             }
490         }
491
492         if (wrapper.getResult().getStatusCode()
493                 == AsyncProcessResult.STATUS_COMPLETE) {
494             allProcesses.remove(ticketId);
495         }
496
497         return wrapper.getResult();
498     }
499
500 }
501
Popular Tags