1 2 5 14 package org.jacorb.trading.impl; 15 16 import java.util.*; 17 import org.omg.CORBA.*; 18 import org.omg.CosTrading.*; 19 import org.omg.CosTrading.LookupPackage.*; 20 import org.omg.CosTrading.RegisterPackage.*; 21 import org.omg.CosTrading.ProxyPackage.*; 22 import org.omg.CosTradingDynamic.*; 23 import org.jacorb.trading.constraint.Constraint; 24 import org.jacorb.trading.util.*; 25 26 29 interface OfferListener 30 { 31 35 public boolean offerNotify(SourceAdapter source); 36 37 40 public void sourceNotify(SourceAdapter source); 41 } 42 43 52 public class OfferEvaluator 53 implements Runnable , OfferListener 54 { 55 56 57 private static final int MAX_DYNAMIC_THREADS = 10; 59 private static final int MAX_PROXY_THREADS = 10; 60 61 private String m_type; 62 private Constraint m_constraint; 63 private String m_preference; 64 private org.omg.CosTrading.Policy[] m_policies; 65 private SpecifiedProps m_desiredProps; 66 private Vector m_sources; 67 private int m_matchCard; 68 private int m_matchCount; 69 private Vector m_results; 70 private int m_numProcessed; 71 private int m_threadPriority; 72 private int m_numDynamicThreads; 73 private int m_numProxyThreads; 74 private MessageQueue m_proxyQueue; 75 private MessageQueue m_dynamicQueue; 76 77 78 public OfferEvaluator( 79 String type, 80 Constraint constraint, 81 String preference, 82 org.omg.CosTrading.Policy[] policies, 83 SpecifiedProps desiredProps, 84 Vector sources, 85 int matchCard) 86 { 87 m_type = type; 89 m_constraint = constraint; 90 m_preference = preference; 91 m_policies = policies; 92 m_desiredProps = desiredProps; 93 m_sources = sources; 94 m_matchCard = matchCard; 95 96 m_matchCount = 0; 97 m_results = new Vector(); 98 m_numProcessed = 0; 99 m_threadPriority = Thread.currentThread().getPriority(); 100 if( m_threadPriority < Thread.MAX_PRIORITY ) 101 m_threadPriority++; 102 103 m_numDynamicThreads = 0; 104 m_numProxyThreads = 0; 105 m_proxyQueue = new MessageQueue(); 106 m_dynamicQueue = new MessageQueue(); 107 108 new Thread (this).start(); 110 } 111 112 113 public synchronized Vector getResults() 114 { 115 while (! getDone()) { 116 try { 117 wait(); } 119 catch (InterruptedException e) { 120 } 121 } 122 123 return m_results; 124 } 125 126 127 public synchronized boolean getDone() 128 { 129 boolean result = 132 (m_matchCard == m_matchCount || m_numProcessed == m_sources.size()); 133 134 return result; 135 } 136 137 138 141 public synchronized boolean offerNotify(SourceAdapter source) 142 { 143 addSource(source); 145 146 if (getDone()) 147 notifyAll(); 148 149 return (! getDone()); 151 } 152 153 154 157 public synchronized void sourceNotify(SourceAdapter source) 158 { 159 m_numProcessed++; 164 165 if (getDone()) 166 notifyAll(); 167 } 168 169 170 182 public void run() { 184 Enumeration e = m_sources.elements(); 185 while (e.hasMoreElements() && ! getDone()) { 186 SourceAdapter source = (SourceAdapter)e.nextElement(); 187 188 if (source instanceof ProxySourceAdapter) 189 scheduleProxy(source); 190 else if (PropUtil.hasDynamicProperties(source.getProperties())) 191 scheduleDynamic(source); 192 else { if (m_constraint.evaluate(source)) 194 offerNotify(source); 195 sourceNotify(source); 196 } 197 } 198 199 synchronized (this) { 201 while (! getDone()) { 202 try { 203 wait(); } 205 catch (InterruptedException ex) { 206 } 207 } 208 209 m_dynamicQueue.deactivate(); 211 m_proxyQueue.deactivate(); 212 } 213 } 214 215 216 220 protected synchronized void scheduleDynamic(SourceAdapter source) 221 { 222 if (m_numDynamicThreads < MAX_DYNAMIC_THREADS) { 224 DynEval d = 225 new DynEval(m_dynamicQueue, m_constraint, m_desiredProps, this); 226 m_numDynamicThreads++; 227 d.setPriority(m_threadPriority); 228 d.start(); 229 } 230 231 m_dynamicQueue.enqueue(source); 233 } 234 235 236 237 protected synchronized void scheduleProxy(SourceAdapter source) 238 { 239 if (m_numProxyThreads < MAX_PROXY_THREADS) { 241 ProxyEval p = new ProxyEval(m_proxyQueue, m_type, m_constraint, 242 m_preference, m_policies, m_desiredProps, this); 243 m_numProxyThreads++; 244 p.setPriority(m_threadPriority); 245 p.start(); 246 } 247 248 m_proxyQueue.enqueue(source); 250 } 251 252 253 254 protected synchronized void addSource(SourceAdapter source) 255 { 256 if (m_matchCount < m_matchCard) { 260 m_results.addElement(source); 261 m_matchCount++; 262 } 263 } 264 265 266 267 270 protected static class DynEval extends Thread 271 { 272 private MessageQueue m_queue; 273 private Constraint m_constraint; 274 private SpecifiedProps m_desiredProps; 275 private OfferListener m_listener; 276 277 278 public DynEval( 279 MessageQueue queue, 280 Constraint constraint, 281 SpecifiedProps desiredProps, 282 OfferListener listener) 283 { 284 m_queue = queue; 285 m_constraint = constraint; 286 m_desiredProps = desiredProps; 287 m_listener = listener; 288 } 289 290 291 295 public void run() 296 { 297 SourceAdapter source; 298 299 while ((source = (SourceAdapter)m_queue.dequeue()) != null) { 300 302 if (m_constraint.evaluate(source)) { 303 Property[] props = source.getProperties(); 304 305 309 if (m_desiredProps.discriminator() == HowManyProps.all) { 310 for (int i = 0; i < props.length; i++) 311 source.getPropertyValue(props[i].name); 312 } 313 else if (m_desiredProps.discriminator() == HowManyProps.some) { 314 String [] names = m_desiredProps.prop_names(); 315 for (int i = 0; i < names.length; i++) 316 source.getPropertyValue(names[i]); 317 } 318 319 m_listener.offerNotify(source); 320 } 321 322 m_listener.sourceNotify(source); 323 } 324 } 325 } 326 327 328 329 332 protected static class ProxyEval extends Thread 333 { 334 private MessageQueue m_queue; 335 private String m_type; 336 private Constraint m_constraint; 337 private String m_preference; 338 private org.omg.CosTrading.Policy[] m_policies; 339 private SpecifiedProps m_desiredProps; 340 private OfferListener m_listener; 341 342 343 public ProxyEval( 344 MessageQueue queue, 345 String type, 346 Constraint constraint, 347 String preference, 348 org.omg.CosTrading.Policy[] policies, 349 SpecifiedProps desiredProps, 350 OfferListener listener) 351 { 352 m_queue = queue; 353 m_type = type; 354 m_constraint = constraint; 355 m_preference = preference; 356 m_policies = policies; 357 m_desiredProps = desiredProps; 358 m_listener = listener; 359 } 360 361 362 366 public void run() 367 { 368 ProxySourceAdapter source; 369 370 while ((source = (ProxySourceAdapter)m_queue.dequeue()) != null) { 371 ProxyInfo info = source.getInfo(); 372 373 boolean match = false; 374 375 if (info.if_match_all) 378 match = true; 379 else match = m_constraint.evaluate(source); 381 382 if (match) { 383 String primary = m_constraint.getConstraint(); 387 String constraint = Recipe.rewrite(info.recipe, source, primary); 388 389 if (constraint != null) { 390 org.omg.CosTrading.Policy[] policies = 393 new org.omg.CosTrading.Policy[ 394 m_policies.length + info.policies_to_pass_on.length]; 395 396 int count = 0; 397 while (count < m_policies.length) { 398 policies[count] = m_policies[count]; 399 count++; 400 } 401 402 for (int i = 0; i < info.policies_to_pass_on.length; i++) 403 policies[count++] = info.policies_to_pass_on[i]; 404 405 try { 406 OfferSeqHolder offers = new OfferSeqHolder(); 407 OfferIteratorHolder iter = new OfferIteratorHolder(); 408 PolicyNameSeqHolder limits = new PolicyNameSeqHolder(); 409 410 info.target.query(m_type, constraint, m_preference, policies, 412 m_desiredProps, 0, offers, iter, limits); 413 414 if (iter.value != null) { 416 OfferSeqHolder seq = new OfferSeqHolder(); 417 boolean more; 418 do { 419 more = iter.value.next_n(20, seq); 420 for (int i = 0; i < seq.value.length; i++) { 421 Offer o = seq.value[i]; 422 SourceAdapter src = 424 new SourceAdapter(o.reference, o.properties); 425 426 if (! m_listener.offerNotify(src)) 427 break; 428 } 429 } 430 while (more); 431 432 iter.value.destroy(); 433 } 434 } 435 catch (org.omg.CORBA.UserException e) { 436 } 438 catch (org.omg.CORBA.SystemException e) { 439 } 441 } 442 } 443 444 m_listener.sourceNotify(source); 446 } 447 } 448 } 449 } 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 | Popular Tags |