KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > trading > impl > OfferEvaluator


1
2 // Copyright (C) 1998-1999
3
// Object Oriented Concepts, Inc.
4

5 // **********************************************************************
6
//
7
// Copyright (c) 1997
8
// Mark Spruiell (mark@intellisoft.com)
9
//
10
// See the COPYING file for more information
11
//
12
// **********************************************************************
13

14 package org.jacorb.trading.impl;
15
16 import java.util.*;
17 import org.omg.CORBA.*;
18 import org.omg.CosTrading.*;
19 import org.omg.CosTrading.LookupPackage.*;
20 import org.omg.CosTrading.RegisterPackage.*;
21 import org.omg.CosTrading.ProxyPackage.*;
22 import org.omg.CosTradingDynamic.*;
23 import org.jacorb.trading.constraint.Constraint;
24 import org.jacorb.trading.util.*;
25
26 /**
27  * Used to receive offers after they've been evaluated
28  */

29 interface OfferListener
30 {
31     /**
32      * Called when a matching offer has been found; returns true if
33      * more offers can be accepted, false otherwise
34      */

35     public boolean offerNotify(SourceAdapter source);
36     
37     /**
38      * Informs the listener that a source has been processed
39      */

40     public void sourceNotify(SourceAdapter source);
41 }
42
43 /**
44  * This class is responsible for evaluating all "local" service offers.
45  * Two queues are used to feed service offers requiring special handling
46  * (i.e. offers containing dynamic properties and proxy offers) to
47  * independent threads for simultaneous evaluation.
48  *
49  * The final result is a Vector of SourceAdapter objects that have
50  * matched the constraint.
51  */

52 public class OfferEvaluator
53     implements Runnable JavaDoc, OfferListener
54 {
55
56
57     // the maxmimum number of helper threads created to evaluate offers
58
private static final int MAX_DYNAMIC_THREADS = 10;
59   private static final int MAX_PROXY_THREADS = 10;
60
61   private String JavaDoc m_type;
62   private Constraint m_constraint;
63   private String JavaDoc m_preference;
64   private org.omg.CosTrading.Policy[] m_policies;
65   private SpecifiedProps m_desiredProps;
66   private Vector m_sources;
67   private int m_matchCard;
68   private int m_matchCount;
69   private Vector m_results;
70   private int m_numProcessed;
71   private int m_threadPriority;
72   private int m_numDynamicThreads;
73   private int m_numProxyThreads;
74   private MessageQueue m_proxyQueue;
75   private MessageQueue m_dynamicQueue;
76
77
78   public OfferEvaluator(
79     String JavaDoc type,
80     Constraint constraint,
81     String JavaDoc preference,
82     org.omg.CosTrading.Policy[] policies,
83     SpecifiedProps desiredProps,
84     Vector sources,
85     int matchCard)
86   {
87       // save arguments
88
m_type = type;
89     m_constraint = constraint;
90     m_preference = preference;
91     m_policies = policies;
92     m_desiredProps = desiredProps;
93     m_sources = sources;
94     m_matchCard = matchCard;
95
96     m_matchCount = 0;
97     m_results = new Vector();
98     m_numProcessed = 0;
99     m_threadPriority = Thread.currentThread().getPriority();
100     if( m_threadPriority < Thread.MAX_PRIORITY )
101     m_threadPriority++;
102
103     m_numDynamicThreads = 0;
104     m_numProxyThreads = 0;
105     m_proxyQueue = new MessageQueue();
106     m_dynamicQueue = new MessageQueue();
107
108       // start a thread to evaluate the offers
109
new Thread JavaDoc(this).start();
110   }
111
112
113   public synchronized Vector getResults()
114   {
115     while (! getDone()) {
116       try {
117         wait(); // wait until we are notified and check again
118
}
119       catch (InterruptedException JavaDoc e) {
120       }
121     }
122
123     return m_results;
124   }
125
126
127   public synchronized boolean getDone()
128   {
129       // we're done when we've considered all the offers or
130
// when m_matchCount == m_matchCard
131
boolean result =
132       (m_matchCard == m_matchCount || m_numProcessed == m_sources.size());
133
134     return result;
135   }
136
137
138   /**
139    * Inherited from Listener; called when an offer has been evaluated
140    */

141   public synchronized boolean offerNotify(SourceAdapter source)
142   {
143       // add source to the results
144
addSource(source);
145
146     if (getDone())
147       notifyAll();
148
149       // return true if we can accept more offers
150
return (! getDone());
151   }
152
153
154   /**
155    * Inherited from Listener; called when a source has been processed
156    */

157   public synchronized void sourceNotify(SourceAdapter source)
158   {
159       // we keep track of the number of original SourceAdapter objects
160
// we've processed, so that we know when we've evaluated all of
161
// them; a single SourceAdapter object may result in numerous
162
// calls to offerNotify (for proxy offers)
163
m_numProcessed++;
164
165     if (getDone())
166       notifyAll();
167   }
168
169
170   /**
171    * The main dispatch thread; we iterate over all of the given sources,
172    * dispatching them to the appropriate thread queue or evaluating the
173    * simple offers immediately.
174    *
175    * Offers with dynamic properties and proxy offers are handled in separate
176    * threads. We give a higher priority to these threads; if we didn't, the
177    * loop in this method might starve the threads that are evaluating offers,
178    * forcing evaluation to occur after all other offers. We at least want to
179    * give the threads a chance of being included, if our match cardinality
180    * is limited.
181    */

182   public void run() // cannot be synchronized
183
{
184     Enumeration e = m_sources.elements();
185     while (e.hasMoreElements() && ! getDone()) {
186       SourceAdapter source = (SourceAdapter)e.nextElement();
187
188       if (source instanceof ProxySourceAdapter)
189         scheduleProxy(source);
190       else if (PropUtil.hasDynamicProperties(source.getProperties()))
191         scheduleDynamic(source);
192       else { // simple offer
193
if (m_constraint.evaluate(source))
194           offerNotify(source);
195         sourceNotify(source);
196       }
197     }
198
199       // wait until we're done, and then deactivate the queues
200
synchronized (this) {
201       while (! getDone()) {
202         try {
203           wait(); // wait until we are notified and check again
204
}
205         catch (InterruptedException JavaDoc ex) {
206         }
207       }
208
209         // deactivating the queues signals the threads to terminate
210
m_dynamicQueue.deactivate();
211       m_proxyQueue.deactivate();
212     }
213   }
214
215
216   /**
217    * The source's offer has dynamic properties, so schedule a
218    * thread to evaluate it
219    */

220   protected synchronized void scheduleDynamic(SourceAdapter source)
221   {
222       // create new DynEval objects until we reach our maximum
223
if (m_numDynamicThreads < MAX_DYNAMIC_THREADS) {
224       DynEval d =
225         new DynEval(m_dynamicQueue, m_constraint, m_desiredProps, this);
226       m_numDynamicThreads++;
227       d.setPriority(m_threadPriority);
228       d.start();
229     }
230
231       // let the next available thread process this offer
232
m_dynamicQueue.enqueue(source);
233   }
234
235
236   /** The source is a proxy offer, so schedule a helper thread to evaluate it */
237   protected synchronized void scheduleProxy(SourceAdapter source)
238   {
239       // create new ProxyEval objects until we reach our maximum
240
if (m_numProxyThreads < MAX_PROXY_THREADS) {
241       ProxyEval p = new ProxyEval(m_proxyQueue, m_type, m_constraint,
242         m_preference, m_policies, m_desiredProps, this);
243       m_numProxyThreads++;
244       p.setPriority(m_threadPriority);
245       p.start();
246     }
247
248       // let the next available thread process this offer
249
m_proxyQueue.enqueue(source);
250   }
251
252
253   /** Adds another match to our list of results */
254   protected synchronized void addSource(SourceAdapter source)
255   {
256       // we may be restricted by m_matchCard, therefore we check
257
// to make sure we're not done before adding more matching
258
// offers
259
if (m_matchCount < m_matchCard) {
260       m_results.addElement(source);
261       m_matchCount++;
262     }
263   }
264
265
266
267   /**
268    * DynEval is responsible for evaluating offers with dynamic properties
269    */

270   protected static class DynEval extends Thread JavaDoc
271   {
272     private MessageQueue m_queue;
273     private Constraint m_constraint;
274     private SpecifiedProps m_desiredProps;
275     private OfferListener m_listener;
276
277
278     public DynEval(
279       MessageQueue queue,
280       Constraint constraint,
281       SpecifiedProps desiredProps,
282       OfferListener listener)
283     {
284       m_queue = queue;
285       m_constraint = constraint;
286       m_desiredProps = desiredProps;
287       m_listener = listener;
288     }
289
290
291     /**
292      * We remove offers from our queue and evaluate them against the
293      * constraint until the queue is deactivated
294      */

295     public void run()
296     {
297       SourceAdapter source;
298
299       while ((source = (SourceAdapter)m_queue.dequeue()) != null) {
300           // first evaluate the constraint with this source
301

302         if (m_constraint.evaluate(source)) {
303           Property[] props = source.getProperties();
304
305             // the constraint succeeded, now we need to ensure that
306
// the SourceAdapter has evaluated all of the "desired"
307
// properties
308

309           if (m_desiredProps.discriminator() == HowManyProps.all) {
310             for (int i = 0; i < props.length; i++)
311               source.getPropertyValue(props[i].name);
312           }
313           else if (m_desiredProps.discriminator() == HowManyProps.some) {
314             String JavaDoc[] names = m_desiredProps.prop_names();
315             for (int i = 0; i < names.length; i++)
316               source.getPropertyValue(names[i]);
317           }
318
319           m_listener.offerNotify(source);
320         }
321
322         m_listener.sourceNotify(source);
323       }
324     }
325   }
326
327
328
329   /**
330    * ProxyEval is responsible for evaluating proxy offers
331    */

332   protected static class ProxyEval extends Thread JavaDoc
333   {
334     private MessageQueue m_queue;
335     private String JavaDoc m_type;
336     private Constraint m_constraint;
337     private String JavaDoc m_preference;
338     private org.omg.CosTrading.Policy[] m_policies;
339     private SpecifiedProps m_desiredProps;
340     private OfferListener m_listener;
341
342
343     public ProxyEval(
344       MessageQueue queue,
345       String JavaDoc type,
346       Constraint constraint,
347       String JavaDoc preference,
348       org.omg.CosTrading.Policy[] policies,
349       SpecifiedProps desiredProps,
350       OfferListener listener)
351     {
352       m_queue = queue;
353       m_type = type;
354       m_constraint = constraint;
355       m_preference = preference;
356       m_policies = policies;
357       m_desiredProps = desiredProps;
358       m_listener = listener;
359     }
360
361
362     /**
363      * We remove offers from our queue and evaluate them
364      * until the queue is deactivated
365      */

366     public void run()
367     {
368       ProxySourceAdapter source;
369
370       while ((source = (ProxySourceAdapter)m_queue.dequeue()) != null) {
371         ProxyInfo info = source.getInfo();
372
373         boolean match = false;
374
375           // if if_match_all is true, then type conformance is all that
376
// is required for this proxy offer to be considered a match
377
if (info.if_match_all)
378           match = true;
379         else // otherwise evaluate the offer against the constraint
380
match = m_constraint.evaluate(source);
381
382         if (match) {
383             // rewrite the constraint; this may fail if the constraint
384
// uses a dynamic property for which a value could not be
385
// obtained
386
String JavaDoc primary = m_constraint.getConstraint();
387           String JavaDoc constraint = Recipe.rewrite(info.recipe, source, primary);
388
389           if (constraint != null) {
390               // build a new array of policies, appending the "policies
391
// to pass on"
392
org.omg.CosTrading.Policy[] policies =
393               new org.omg.CosTrading.Policy[
394                 m_policies.length + info.policies_to_pass_on.length];
395
396             int count = 0;
397             while (count < m_policies.length) {
398               policies[count] = m_policies[count];
399               count++;
400             }
401
402             for (int i = 0; i < info.policies_to_pass_on.length; i++)
403               policies[count++] = info.policies_to_pass_on[i];
404
405             try {
406               OfferSeqHolder offers = new OfferSeqHolder();
407               OfferIteratorHolder iter = new OfferIteratorHolder();
408               PolicyNameSeqHolder limits = new PolicyNameSeqHolder();
409
410                 // call the target trader
411
info.target.query(m_type, constraint, m_preference, policies,
412                 m_desiredProps, 0, offers, iter, limits);
413
414                 // process any offers we received
415
if (iter.value != null) {
416                 OfferSeqHolder seq = new OfferSeqHolder();
417                 boolean more;
418                 do {
419                   more = iter.value.next_n(20, seq);
420                   for (int i = 0; i < seq.value.length; i++) {
421                     Offer o = seq.value[i];
422                       // create a new SourceAdapter object for each offer
423
SourceAdapter src =
424                       new SourceAdapter(o.reference, o.properties);
425
426                     if (! m_listener.offerNotify(src))
427                       break;
428                   }
429                 }
430                 while (more);
431
432                 iter.value.destroy();
433               }
434             }
435             catch (org.omg.CORBA.UserException JavaDoc e) {
436               // ignore
437
}
438             catch (org.omg.CORBA.SystemException JavaDoc e) {
439               // ignore
440
}
441           }
442         }
443
444           // we've processed the source
445
m_listener.sourceNotify(source);
446       }
447     }
448   }
449 }
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
Popular Tags