KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > LoadingFactor


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2004 - 2007 ScalAgent Distributed Technologies
4  * Copyright (C) 2004 France Telecom R&D
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  * Contributor(s):
23  */

24 package org.objectweb.joram.mom.dest;
25
26 import java.io.Serializable JavaDoc;
27 import java.util.Enumeration JavaDoc;
28 import java.util.Hashtable JavaDoc;
29 import java.util.Vector JavaDoc;
30
31 import org.objectweb.joram.mom.notifications.LBMessageGive;
32 import org.objectweb.joram.mom.notifications.LBMessageHope;
33 import org.objectweb.joram.shared.JoramTracing;
34 import org.objectweb.util.monolog.api.BasicLevel;
35
36 import fr.dyade.aaa.agent.AgentId;
37
38 public class LoadingFactor implements Serializable JavaDoc {
39
40   public static class Status {
41     public final static int INIT = 0;
42     public final static int RUN = 1;
43     public final static int WAIT = 2;
44     
45     public final static String JavaDoc[] names = {"INIT", "RUN", "WAIT"};
46   }
47
48   public static class ConsumerStatus {
49     public final static int CONSUMER_NO_ACTIVITY = 0;
50     public final static int CONSUMER_HIGH_ACTIVITY = 1;
51     public final static int CONSUMER_NORMAL_ACTIVITY = 2;
52     
53     public final static String JavaDoc[] names =
54     {"CONSUMER_NO_ACTIVITY",
55      "CONSUMER_HIGH_ACTIVITY",
56      "CONSUMER_NORMAL_ACTIVITY"};
57   }
58   
59   public static class ProducerStatus {
60     public final static int PRODUCER_NO_ACTIVITY = 0;
61     public final static int PRODUCER_HIGH_ACTIVITY = 1;
62     public final static int PRODUCER_NORMAL_ACTIVITY = 2;
63     
64     public final static String JavaDoc[] names =
65     {"PRODUCER_NO_ACTIVITY",
66      "PRODUCER_HIGH_ACTIVITY",
67      "PRODUCER_NORMAL_ACTIVITY"};
68   }
69   
70   /** status */
71   private int status;
72   /** status time */
73   private long statusTime;
74
75   /** consumer status */
76   private int consumerStatus = 0;
77   /** producer status */
78   private int producerStatus = 0;
79
80   /** reference to clusterQueueImpl */
81   public ClusterQueueImpl clusterQueueImpl;
82   /** producer threshold */
83   public int producThreshold = -1;
84   /** consumer threshold */
85   public int consumThreshold = -1;
86   /** automatic eval threshold */
87   public boolean autoEvalThreshold = false;
88   /** validity period */
89   public long validityPeriod = -1;
90
91   private float rateOfFlow;
92   private boolean overLoaded;
93   private int nbOfPendingMessages;
94   private int nbOfPendingRequests;
95
96   public LoadingFactor(ClusterQueueImpl clusterQueueImpl,
97                        int producThreshold,
98                        int consumThreshold,
99                        boolean autoEvalThreshold,
100                        long validityPeriod) {
101     this.clusterQueueImpl = clusterQueueImpl;
102     this.producThreshold = producThreshold;
103     this.consumThreshold = consumThreshold;
104     this.autoEvalThreshold = autoEvalThreshold;
105     this.validityPeriod = validityPeriod;
106     rateOfFlow = 1;
107     status = 0;
108   }
109
110   public void setRateOfFlow(float rateOfFlow) {
111     this.rateOfFlow = rateOfFlow;
112   }
113
114   public float getRateOfFlow() {
115     return rateOfFlow;
116   }
117
118   public void setWait() {
119     status = Status.WAIT;
120     statusTime = System.currentTimeMillis() + validityPeriod;
121   }
122
123   /**
124    * this method eval the activity
125    * of consumer and producer.
126    */

127   private void evalActivity() {
128     if (nbOfPendingMessages == 0)
129       producerStatus = ProducerStatus.PRODUCER_NO_ACTIVITY;
130     else if (nbOfPendingMessages > producThreshold)
131       producerStatus = ProducerStatus.PRODUCER_HIGH_ACTIVITY;
132     else
133       producerStatus = ProducerStatus.PRODUCER_NORMAL_ACTIVITY;
134
135     if (nbOfPendingRequests == 0)
136       consumerStatus = ConsumerStatus.CONSUMER_NO_ACTIVITY;
137     else if (nbOfPendingRequests > consumThreshold)
138       consumerStatus = ConsumerStatus.CONSUMER_HIGH_ACTIVITY;
139     else
140       consumerStatus = ConsumerStatus.CONSUMER_NORMAL_ACTIVITY;
141   }
142
143   /**
144    * update the threshol if autoEvalThreshold is true.
145    */

146   private void updateThreshol() {
147     if (autoEvalThreshold) {
148
149       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
150         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
151                                       "LoadingFactor.updateThreshol before" +
152                                       " rateOfFlow=" + rateOfFlow +
153                                       ", producThreshold=" + producThreshold +
154                                       ", consumThreshold=" + consumThreshold );
155
156       int deltaProd;
157       int deltaCons;
158
159       if (rateOfFlow < 1) {
160         deltaProd = (int) ((nbOfPendingMessages - producThreshold) * rateOfFlow);
161         deltaCons = (int) ((nbOfPendingRequests - consumThreshold) * rateOfFlow);
162       } else {
163         deltaProd = (int) ((nbOfPendingMessages - producThreshold) / rateOfFlow);
164         deltaCons = (int) ((nbOfPendingRequests - consumThreshold) / rateOfFlow);
165       }
166
167       if (nbOfPendingMessages > 0) {
168         if (deltaProd < producThreshold)
169           producThreshold = producThreshold + deltaProd;
170         else
171           producThreshold = deltaProd;
172       }
173
174       if (nbOfPendingRequests > 0) {
175         if (deltaCons < consumThreshold)
176           consumThreshold = consumThreshold + deltaCons;
177         else
178           consumThreshold = deltaCons;
179       }
180
181       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
182         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
183                                       "LoadingFactor.updateThreshol after" +
184                                       " rateOfFlow=" + rateOfFlow +
185                                       ", producThreshold=" + producThreshold +
186                                       ", consumThreshold=" + consumThreshold );
187     }
188   }
189
190   /**
191    * eval the rate of flow (means).
192    * if rateOfFlow > 1 the queue are more pending requests
193    * than pending messages.
194    * else if rateOfFlow < 1 the queue are more pending messages
195    * than pending requests.
196    * This value is set in all QueueClusterNot notification.
197    *
198    * @param pendingMessages
199    * @param pendingRequests
200    * @return
201    */

202   public float evalRateOfFlow(int pendingMessages,
203                               int pendingRequests) {
204     float currentROF;
205     nbOfPendingMessages = pendingMessages;
206     nbOfPendingRequests = pendingRequests;
207
208     if (pendingMessages == 0 && pendingRequests == 0)
209       currentROF = 1;
210     else if (pendingMessages == 0 && pendingRequests != 0)
211       currentROF = pendingRequests + 1;
212     else
213       currentROF =
214         new Float JavaDoc(pendingRequests).floatValue() /
215         new Float JavaDoc(pendingMessages).floatValue();
216     
217     rateOfFlow = (currentROF + rateOfFlow ) / 2;
218
219     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
220       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
221                                     "LoadingFactor.evalRateOfFlow" +
222                                     " pendingMessages = " + pendingMessages +
223                                     ", pendingRequests = " + pendingRequests +
224                                     ", rateOfFlow = " + rateOfFlow +
225                                     ", currentROF = " + currentROF);
226
227     return rateOfFlow;
228   }
229
230   /**
231    * this method eval the rate of flow and activity.
232    * if necessary send "give or hope" messages, and
233    * update threshol.
234    *
235    * @param clusters
236    * @param pendingMessages
237    * @param pendingRequests
238    */

239   public void factorCheck(Hashtable JavaDoc clusters,
240                           int pendingMessages,
241                           int pendingRequests) {
242
243     nbOfPendingMessages = pendingMessages;
244     nbOfPendingRequests = pendingRequests;
245
246     if (status == Status.WAIT &&
247         statusTime < System.currentTimeMillis())
248       status = Status.RUN;
249     
250
251     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
252       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
253                                     ">> LoadingFactor.factorCheck " +
254                                     this + "\nclusters = " + clusters);
255
256     evalRateOfFlow(pendingMessages,
257                    pendingRequests);
258
259     evalActivity();
260
261     if ( status == Status.INIT || status == Status.RUN) {
262       if (isOverloaded()) {
263         dispatchAndSendTo(clusters,
264                           pendingMessages,
265                           pendingRequests);
266         status = Status.WAIT;
267         statusTime = System.currentTimeMillis() + validityPeriod;
268       }
269     }
270
271     updateThreshol();
272
273     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
274       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
275                                     "<< LoadingFactor.factorCheck "
276                                     + this);
277   }
278
279   /**
280    * true if cluster queue is overloaded.
281    * depends on activity.
282    *
283    * @return true if cluster queue is overloaded.
284    */

285   public boolean isOverloaded() {
286     overLoaded = false;
287     if ((consumerStatus ==
288          ConsumerStatus.CONSUMER_HIGH_ACTIVITY) ||
289         (producerStatus ==
290          ProducerStatus.PRODUCER_HIGH_ACTIVITY))
291       overLoaded = true;
292     
293     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
294       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
295                                     "LoadingFactor.isOverloaded "
296                                     + overLoaded);
297     return overLoaded;
298   }
299
300   /**
301    * use to dispatch request hope or give messages
302    * in clusters.
303    *
304    * @param clusters
305    * @param nbOfPendingMessages
306    * @param nbOfPendingRequests
307    */

308   private void dispatchAndSendTo(Hashtable JavaDoc clusters,
309                                  int nbOfPendingMessages,
310                                  int nbOfPendingRequests) {
311     int nbMsgHope = -1;
312     int nbMsgGive = -1;
313     
314     if ((consumerStatus == ConsumerStatus.CONSUMER_NO_ACTIVITY) &&
315         (producerStatus == ProducerStatus.PRODUCER_NO_ACTIVITY))
316       return;
317     
318     if (producThreshold < nbOfPendingMessages)
319       nbMsgGive = nbOfPendingMessages - producThreshold;
320
321     if (consumThreshold < nbOfPendingRequests)
322       nbMsgHope = nbOfPendingRequests;
323
324 // if (nbOfPendingRequests > nbOfPendingMessages)
325
// nbMsgHope = nbOfPendingRequests - nbOfPendingMessages;
326
// else
327
// nbMsgGive = nbOfPendingMessages - nbOfPendingRequests;
328

329     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
330       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
331                                     "LoadingFactor.dispatchAndSendTo" +
332                                     "\nnbMsgHope=" + nbMsgHope +
333                                     ", nbMsgGive=" + nbMsgGive);
334
335
336     if (consumerStatus == ConsumerStatus.CONSUMER_HIGH_ACTIVITY)
337       processHope(nbMsgHope,clusters);
338
339     if (producerStatus == ProducerStatus.PRODUCER_HIGH_ACTIVITY)
340       processGive(nbMsgGive,clusters);
341   }
342
343   /**
344    * send nb messages on clusters.
345    *
346    * @param nbMsgGive
347    * @param clusters Hashtable of cluster Queue
348    */

349   private void processGive(int nbMsgGive, Hashtable JavaDoc clusters) {
350     if (nbMsgGive < 1) return;
351
352     // select queue in cluster who have a rateOfFlow > 1
353
Vector JavaDoc selected = new Vector JavaDoc();
354     for (Enumeration JavaDoc e = clusters.keys(); e.hasMoreElements(); ) {
355       AgentId id = (AgentId) e.nextElement();
356       if (((Float JavaDoc) clusters.get(id)).floatValue() >= 1 &&
357           !id.equals(clusterQueueImpl.destId))
358         selected.add(id);
359     }
360     
361     if (selected.size() == 0) return;
362     
363     int nbGivePerQueue = nbMsgGive / selected.size();
364     LBMessageGive msgGive = new LBMessageGive(validityPeriod, rateOfFlow);
365     
366     if (nbGivePerQueue == 0 && nbMsgGive > 0) {
367       // send all to the first element of clusterQueue.
368
AgentId id = (AgentId) selected.get(0);
369       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
370         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
371                                       "LoadingFactor.processGive" +
372                                       " nbMsgGive = " + nbMsgGive +
373                                       ", id = " + id);
374       msgGive.setClientMessages(clusterQueueImpl.getClientMessages(nbMsgGive, null, true));
375       clusterQueueImpl.forward(id,msgGive);
376     } else {
377       // dispatch to cluster.
378
if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
379         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
380                                       "LoadingFactor.processGive" +
381                                       " givePerQueue = " + nbGivePerQueue +
382                                       ", selected = " + selected);
383       for (Enumeration JavaDoc e = selected.elements(); e.hasMoreElements(); ) {
384         AgentId id = (AgentId) e.nextElement();
385         msgGive.setClientMessages(clusterQueueImpl.getClientMessages(nbGivePerQueue, null, true));
386         clusterQueueImpl.forward(id,msgGive);
387       }
388     }
389   }
390
391  /**
392   * send a hope request on a cluster queue.
393   *
394   * @param nbMsgHope
395   * @param clusters Hashtable of cluster Queue
396   */

397   private void processHope(int nbMsgHope, Hashtable JavaDoc clusters) {
398     if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
399       JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
400                                     "LoadingFactor.processHope" +
401                                     " nbMsgHope = " + nbMsgHope);
402     if (nbMsgHope < 1) return;
403
404     Vector JavaDoc selected = new Vector JavaDoc();
405     for (Enumeration JavaDoc e = clusters.keys(); e.hasMoreElements(); ) {
406       AgentId id = (AgentId) e.nextElement();
407       if (((Float JavaDoc) clusters.get(id)).floatValue() <= 1 &&
408           !id.equals(clusterQueueImpl.destId))
409         selected.add(id);
410     }
411
412     if (selected.size() == 0) return;
413
414     int nbHopePerQueue = nbMsgHope / selected.size();
415     if (nbHopePerQueue == 0 && nbMsgHope > 0) {
416       // send the hope request to the first element of clusterQueue.
417
AgentId id = (AgentId) selected.get(0);
418       
419       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
420         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
421                                       "LoadingFactor.processHope" +
422                                       " nbMsgHope = " + nbMsgHope +
423                                       ", id = " + id);
424       LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow);
425       msgHope.setNbMsg(nbMsgHope);
426       clusterQueueImpl.forward(id,msgHope);
427       
428     } else {
429       // dispatch the hope request to clusterQueue.
430
if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))
431         JoramTracing.dbgDestination.log(BasicLevel.DEBUG,
432                                       "LoadingFactor.processHope" +
433                                       " hopePerQueue = " + nbHopePerQueue +
434                                       ", selected = " + selected);
435       
436       LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow);
437       for (Enumeration JavaDoc e = selected.elements(); e.hasMoreElements(); ) {
438         AgentId id = (AgentId) e.nextElement();
439         msgHope.setNbMsg(nbHopePerQueue);
440         clusterQueueImpl.forward(id,msgHope);
441       }
442     }
443   }
444
445   public String JavaDoc toString() {
446     StringBuffer JavaDoc str = new StringBuffer JavaDoc();
447     str.append("LoadingFactor (status=");
448     str.append(Status.names[status]);
449     str.append(", consumerStatus=");
450     str.append(ConsumerStatus.names[consumerStatus]);
451     str.append(", producerStatus=");
452     str.append(ProducerStatus.names[producerStatus]);
453     str.append(", producThreshold=");
454     str.append(producThreshold);
455     str.append(", consumThreshold=");
456     str.append(consumThreshold);
457     str.append(", autoEvalThreshold=");
458     str.append(autoEvalThreshold);
459     str.append(", nbOfPendingMessages=");
460     str.append(nbOfPendingMessages);
461     str.append(", nbOfPendingRequests=");
462     str.append(nbOfPendingRequests);
463     str.append(", rateOfFlow=");
464     str.append(rateOfFlow);
465     str.append(", overLoaded=");
466     str.append(overLoaded);
467     str.append(")");
468     return str.toString();
469   }
470 }
471
Popular Tags