1 package org.jacorb.trading.util; 2 3 import org.apache.avalon.framework.logger.*; 4 import org.apache.avalon.framework.configuration.*; 5 6 import java.util.*; 7 import org.omg.CORBA.*; 8 import java.lang.*; 9 10 19 20 public class QueryPropagator 21 implements Configurable 22 { 23 24 private org.jacorb.config.Configuration configuration = null; 25 private Logger logger = null; 26 27 33 34 private class QueryThread 35 extends Thread 36 { 37 private QueryContainer m_query = null; 38 private int no = 0; 39 private TimeoutThread m_timer; 40 41 44 public QueryThread(TimeoutThread timer) 45 { 46 no = threadc++; 47 m_total_threads++; 48 49 m_timer = timer; 50 51 start(); 52 } 53 54 57 public void run(){ 58 do { 59 m_idle_threads++; 60 getWork(); 61 executeQuery(); 62 } while (true); 63 } 64 65 69 private synchronized void executeQuery() 70 { 71 try 72 { 73 m_timer.setTimeout(this); 75 76 m_query.m_target.query(m_query.m_type, 77 m_query.m_constr, 78 m_query.m_pref, 79 m_query.m_policies, 80 m_query.m_desired_props, 81 m_query.m_how_many, 82 m_query.m_offers, 83 m_query.m_offer_itr, 84 m_query.m_limits_applied); 85 m_timer.stopTimer(this); 86 } 87 catch (UserException e) 88 { 89 m_query.m_exception = e; 90 91 m_query.m_offers.value = new org.omg.CosTrading.Offer[0]; 93 m_query.m_limits_applied.value = new String [0]; 94 } 95 catch (Exception e) 96 { 97 99 m_query.m_offers.value = new org.omg.CosTrading.Offer[0]; 101 m_query.m_limits_applied.value = new String [0]; 102 } 103 104 m_query.m_mutex.V(); m_query = null; 106 } 107 108 109 114 private void getWork() 115 { 116 m_query_cons.P(); 117 m_query = m_new_query; 119 m_idle_threads--; 120 m_query_prod.V(); 121 122 } 123 } 125 126 128 private int m_idle_threads = 0; private int m_total_threads = 0; private Semaphore m_idle_threads_sema; 132 private QueryContainer m_new_query; private Semaphore m_query_cons; private Semaphore m_query_prod; 136 private int m_max_threads = 10; private int m_min_threads = 5; private int m_query_timeout = 60000; private TimeoutThread m_timer = null; 141 142 private static int threadc = 0; 143 private int m_debug_verbosity = 2; 144 145 149 public QueryPropagator() 150 { 151 m_idle_threads_sema = new Semaphore(); 152 m_query_cons = new Semaphore(0); m_query_prod = new Semaphore(); 155 156 m_timer = new TimeoutThread(m_query_timeout); 157 } 158 159 public void configure(Configuration myConfiguration) 160 throws ConfigurationException 161 { 162 this.configuration = 163 (org.jacorb.config.Configuration)myConfiguration; 164 logger = 165 configuration.getNamedLogger("jacorb.trading"); 166 167 m_max_threads = 168 configuration.getAttributeAsInteger("jtrader.util.max_threads",10); 169 m_min_threads = 170 configuration.getAttributeAsInteger("jtrader.util.min_threads",1); 171 m_query_timeout = 172 configuration.getAttributeAsInteger("jtrader.util.query_timeout"); 173 } 174 175 176 177 184 public void putWork(QueryContainer query){ 185 186 boolean _none_idle; 187 if (logger.isDebugEnabled()) 188 logger.debug("Put work (waiting): query(" + query.no + ")"); 189 m_query_prod.P(); 190 _none_idle = m_idle_threads < m_min_threads && m_total_threads < m_max_threads; 191 m_new_query = query; 192 if (logger.isDebugEnabled()) 193 logger.debug("Put work (got P) query(" + m_new_query.no + ")"); 194 m_query_cons.V(); 195 if (logger.isDebugEnabled()) 196 logger.debug("left put work: query(" + m_new_query.no + ")"); 197 198 if (_none_idle){ 199 if (logger.isDebugEnabled()) 201 logger.debug("Not enough Threads: " + m_idle_threads); 202 QueryThread _thread = new QueryThread(m_timer); 203 } 205 } 206 } 208 209 210 211 212 213 | Popular Tags |