KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > trading > util > QueryPropagator


1 package org.jacorb.trading.util;
2
3 import org.apache.avalon.framework.logger.*;
4 import org.apache.avalon.framework.configuration.*;
5
6 import java.util.*;
7 import org.omg.CORBA.*;
8 import java.lang.*;
9
10 /**
11  * This class is a sort of ThreadPool. It holds a number of QueryThreads
12  * which execute the remote query()-calls. It is closely connected to the
13  * QueryContainer-Class.<p>
14  * Concurrency issues are adressed with semaphores and the producer-consumer pattern.
15  * New Threads are started if the need arises.
16  *
17  * @author Nicolas Noffke
18  */

19
20 public class QueryPropagator
21     implements Configurable
22 {
23     /** the configuration object */
24     private org.jacorb.config.Configuration configuration = null;
25     private Logger logger = null;
26
27     /**
28      * This class represents a thread wich executes the remote query()-calls.
29      * For efficiency reasons it is an inner class so we can access attributes
30      * of QueryPropagator in an easy fashion.
31      *
32      */

33
34     private class QueryThread
35         extends Thread JavaDoc
36     {
37     private QueryContainer m_query = null;
38     private int no = 0;
39     private TimeoutThread m_timer;
40     
41     /**
42      * Default constructor, nothing done here.
43      */

44     public QueryThread(TimeoutThread timer)
45         {
46         no = threadc++;
47         m_total_threads++;
48
49         m_timer = timer;
50
51         start();
52     }
53
54     /**
55      * The threads main loop.
56      */

57     public void run(){
58         do {
59         m_idle_threads++;
60         getWork();
61         executeQuery();
62         } while (true);
63     }
64     
65     /**
66      * This method executes the remote query()-call and copies the
67      * caught exceptions to the QueryContainer, if necessary.
68      */

69     private synchronized void executeQuery()
70     {
71         try
72         {
73           // setting timout, so we don't wait infinitely
74
m_timer.setTimeout(this);
75
76         m_query.m_target.query(m_query.m_type,
77                        m_query.m_constr,
78                        m_query.m_pref,
79                        m_query.m_policies,
80                        m_query.m_desired_props,
81                        m_query.m_how_many,
82                        m_query.m_offers,
83                        m_query.m_offer_itr,
84                        m_query.m_limits_applied);
85         m_timer.stopTimer(this);
86         }
87         catch (UserException e)
88         {
89         m_query.m_exception = e;
90
91         // initializing anyway, for safety reasons
92
m_query.m_offers.value = new org.omg.CosTrading.Offer[0];
93         m_query.m_limits_applied.value = new String JavaDoc[0];
94         }
95         catch (Exception JavaDoc e)
96         {
97                 // org.jacorb.util.Debug.output(2, e);
98

99         // initializing anyway, for safety reasons
100
m_query.m_offers.value = new org.omg.CosTrading.Offer[0];
101         m_query.m_limits_applied.value = new String JavaDoc[0];
102         }
103
104         m_query.m_mutex.V(); // releasing lock on QueryContainer, i.e. resultReady() unblocks
105
m_query = null;
106     }
107     
108
109     /**
110      * Wait for a new QueryContainer to arrive, i.e. be passed in by putWork().
111      * It uses two semaphores and the consumer-producer pattern for concurrency
112      * control.
113      */

114     private void getWork()
115         {
116         m_query_cons.P();
117         m_query = m_new_query; // get new QueryContainer
118

119         m_idle_threads--;
120         m_query_prod.V();
121
122     }
123     } // QueryThread
124

125
126     ////////////////////// Body of QueryPropagator /////////////////////////////////////
127

128     private int m_idle_threads = 0; //stores currently idle threads
129
private int m_total_threads = 0; //no. of threads
130
private Semaphore m_idle_threads_sema; //mutex for m_idle_threads
131

132     private QueryContainer m_new_query; // new query to be sheduled
133
private Semaphore m_query_cons; // consumer mutex aka 'full'
134
private Semaphore m_query_prod; // producer mutex aka 'empty'
135

136     private int m_max_threads = 10; //if more threads than this, then we don't
137
//start a new thread, even if none are idle
138
private int m_min_threads = 5; // if less than this, then we create a new thread
139
private int m_query_timeout = 60000; // max time for a query to return;
140
private TimeoutThread m_timer = null;
141
142     private static int threadc = 0;
143     private int m_debug_verbosity = 2;
144
145     /**
146      * Constructor of QueryPropagator
147      *
148      */

149     public QueryPropagator()
150     {
151     m_idle_threads_sema = new Semaphore();
152     m_query_cons = new Semaphore(0); // setting semaphore to 0, since consumers
153
// must block until first producer issues a V()
154
m_query_prod = new Semaphore();
155
156     m_timer = new TimeoutThread(m_query_timeout);
157     }
158     
159     public void configure(Configuration myConfiguration)
160         throws ConfigurationException
161     {
162         this.configuration =
163             (org.jacorb.config.Configuration)myConfiguration;
164         logger =
165             configuration.getNamedLogger("jacorb.trading");
166
167         m_max_threads =
168             configuration.getAttributeAsInteger("jtrader.util.max_threads",10);
169         m_min_threads =
170             configuration.getAttributeAsInteger("jtrader.util.min_threads",1);
171         m_query_timeout =
172             configuration.getAttributeAsInteger("jtrader.util.query_timeout");
173     }
174
175
176
177     /**
178      * This method takes a new QueryContainer and schedules it to a QueryThread
179      * to execute the remote query. Counterpart to getWork. <br>
180      * If no threads are idle, a new one is started.
181      *
182      * @param query New QueryContainer to be executed
183      */

184     public void putWork(QueryContainer query){
185
186     boolean _none_idle;
187     if (logger.isDebugEnabled())
188         logger.debug("Put work (waiting): query(" + query.no + ")");
189     m_query_prod.P();
190     _none_idle = m_idle_threads < m_min_threads && m_total_threads < m_max_threads;
191     m_new_query = query;
192     if (logger.isDebugEnabled())
193         logger.debug("Put work (got P) query(" + m_new_query.no + ")");
194     m_query_cons.V();
195     if (logger.isDebugEnabled())
196         logger.debug("left put work: query(" + m_new_query.no + ")");
197
198     if (_none_idle){
199         // no threads idle and maximum thread count not reached, so start new one
200
if (logger.isDebugEnabled())
201         logger.debug("Not enough Threads: " + m_idle_threads);
202         QueryThread _thread = new QueryThread(m_timer);
203         // new thread will call getWork() as first action
204
}
205     }
206 } // QueryPropagator
207

208
209
210
211
212
213
Popular Tags