KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sapia > ubik > rmi > server > command > InQueue


1 package org.sapia.ubik.rmi.server.command;
2
3 import org.sapia.ubik.net.PooledThread;
4 import org.sapia.ubik.net.ThreadPool;
5 import org.sapia.ubik.rmi.server.Log;
6 import org.sapia.ubik.rmi.server.ShutdownException;
7
8
9 /**
10  * Implements the queue into which incoming <code>Executable</code>
11  * instances are inserted before being processed in separate threads.
12  * <p>
13  * The <code>Executable</code> instances are in this case expected to be
14  * <code>AsyncCommand</code> instances.
15  * <p>
16  * As an <code>AsyncCommand</code> is enqueued, an available processing thread
17  * handles the command is executed. If no thread is available, then the command
18  * sits in the queue until a thread becomes available - and until the command's
19  * turn comes - i.e.: this is a queue and commands are treated in a FIFO fashion.
20  *
21  * @author Yanick Duchesne
22  * <dl>
23  * <dt><b>Copyright:</b><dd>Copyright &#169; 2002-2003 <a HREF="http://www.sapia-oss.org">Sapia Open Source Software</a>. All Rights Reserved.</dd></dt>
24  * <dt><b>License:</b><dd>Read the license.txt file of the jar or visit the
25  * <a HREF="http://www.sapia-oss.org/license.html">license page</a> at the Sapia OSS web site</dd></dt>
26  * </dl>
27  */

28 public class InQueue extends ExecQueue {
29   CmdProcessorThreadPool _pool;
30
31   /**
32    * Creates a new instance of this clas with one internal processor thread.
33    */

34   InQueue() throws Exception JavaDoc {
35     this(1);
36   }
37
38   /**
39    * Creates a new instance of this clas with the given number of processor threads.
40    *
41    * @param maxThreads the maximum number of internal threads created by this queue.
42    */

43   InQueue(int maxThreads) throws Exception JavaDoc {
44     super();
45
46     if (maxThreads <= 0) {
47       maxThreads = 1;
48     }
49
50     _pool = new CmdProcessorThreadPool(maxThreads);
51     _pool.fill(maxThreads);
52
53     PooledThread pt;
54
55     for (int count = 0; count < maxThreads; count++) {
56       pt = (PooledThread) _pool.acquire();
57       pt.exec(this);
58     }
59   }
60
61   public void shutdown(long timeout) throws InterruptedException JavaDoc {
62     super.shutdown(timeout);
63     _pool.shutdown(timeout);
64   }
65
66   /*////////////////////////////////////////////////////////////////////
67                                INNER CLASSES
68   ////////////////////////////////////////////////////////////////////*/

69   static class CmdProcessorThread extends PooledThread {
70     /**
71      * @see org.sapia.ubik.net.PooledThread#doExec(Object)
72      */

73     protected void doExec(Object JavaDoc task) {
74       InQueue queue = (InQueue) task;
75
76       while (true) {
77         AsyncCommand async;
78         Object JavaDoc toReturn;
79
80         try {
81           async = (AsyncCommand) queue.remove();
82
83           try {
84             toReturn = async.execute();
85           } catch (ShutdownException e) {
86             Log.warning(getName(), "Shutting down...");
87
88             break;
89           } catch (Throwable JavaDoc t) {
90             toReturn = t;
91           }
92
93           OutQueue.getQueueFor(new Destination(async.getFrom(),
94               async.getCallerVmId())).add(new Response(async.getCmdId(),
95               toReturn));
96           Thread.yield();
97         } catch (InterruptedException JavaDoc e) {
98           break;
99         }
100       }
101     }
102
103     /**
104      * @see org.sapia.ubik.net.PooledThread#shutdown()
105      */

106     public void shutdown() {
107       Log.warning(getName(), "Shut down signal received...");
108       super.shutdown();
109     }
110   }
111
112   static class CmdProcessorThreadPool extends ThreadPool {
113     /**
114      * Constructor for CmdProcessorThreadPool.
115      * @param name
116      * @param maxSize
117      */

118     public CmdProcessorThreadPool(int maxSize) {
119       super("ubik.rmi.CallbackThread", true, maxSize);
120     }
121
122     /**
123      * @see org.sapia.ubik.net.ThreadPool#newThread()
124      */

125     protected PooledThread newThread() throws Exception JavaDoc {
126       return new CmdProcessorThread();
127     }
128   }
129 }
130
Popular Tags