1 24 package org.objectweb.joram.mom.dest; 25 26 import java.io.Serializable ; 27 import java.util.Enumeration ; 28 import java.util.Hashtable ; 29 import java.util.Vector ; 30 31 import org.objectweb.joram.mom.notifications.LBMessageGive; 32 import org.objectweb.joram.mom.notifications.LBMessageHope; 33 import org.objectweb.joram.shared.JoramTracing; 34 import org.objectweb.util.monolog.api.BasicLevel; 35 36 import fr.dyade.aaa.agent.AgentId; 37 38 public class LoadingFactor implements Serializable { 39 40 public static class Status { 41 public final static int INIT = 0; 42 public final static int RUN = 1; 43 public final static int WAIT = 2; 44 45 public final static String [] names = {"INIT", "RUN", "WAIT"}; 46 } 47 48 public static class ConsumerStatus { 49 public final static int CONSUMER_NO_ACTIVITY = 0; 50 public final static int CONSUMER_HIGH_ACTIVITY = 1; 51 public final static int CONSUMER_NORMAL_ACTIVITY = 2; 52 53 public final static String [] names = 54 {"CONSUMER_NO_ACTIVITY", 55 "CONSUMER_HIGH_ACTIVITY", 56 "CONSUMER_NORMAL_ACTIVITY"}; 57 } 58 59 public static class ProducerStatus { 60 public final static int PRODUCER_NO_ACTIVITY = 0; 61 public final static int PRODUCER_HIGH_ACTIVITY = 1; 62 public final static int PRODUCER_NORMAL_ACTIVITY = 2; 63 64 public final static String [] names = 65 {"PRODUCER_NO_ACTIVITY", 66 "PRODUCER_HIGH_ACTIVITY", 67 "PRODUCER_NORMAL_ACTIVITY"}; 68 } 69 70 71 private int status; 72 73 private long statusTime; 74 75 76 private int consumerStatus = 0; 77 78 private int producerStatus = 0; 79 80 81 public ClusterQueueImpl clusterQueueImpl; 82 83 public int producThreshold = -1; 84 85 public int consumThreshold = -1; 86 87 public boolean autoEvalThreshold = false; 88 89 public long validityPeriod = -1; 90 91 private float rateOfFlow; 92 private boolean overLoaded; 93 private int nbOfPendingMessages; 94 private int nbOfPendingRequests; 95 96 public LoadingFactor(ClusterQueueImpl clusterQueueImpl, 97 int producThreshold, 98 int consumThreshold, 99 boolean autoEvalThreshold, 100 long validityPeriod) { 101 this.clusterQueueImpl = clusterQueueImpl; 102 this.producThreshold = producThreshold; 103 this.consumThreshold = consumThreshold; 104 this.autoEvalThreshold = autoEvalThreshold; 105 this.validityPeriod = validityPeriod; 106 rateOfFlow = 1; 107 status = 0; 108 } 109 110 public void setRateOfFlow(float rateOfFlow) { 111 this.rateOfFlow = rateOfFlow; 112 } 113 114 public float getRateOfFlow() { 115 return rateOfFlow; 116 } 117 118 public void setWait() { 119 status = Status.WAIT; 120 statusTime = System.currentTimeMillis() + validityPeriod; 121 } 122 123 127 private void evalActivity() { 128 if (nbOfPendingMessages == 0) 129 producerStatus = ProducerStatus.PRODUCER_NO_ACTIVITY; 130 else if (nbOfPendingMessages > producThreshold) 131 producerStatus = ProducerStatus.PRODUCER_HIGH_ACTIVITY; 132 else 133 producerStatus = ProducerStatus.PRODUCER_NORMAL_ACTIVITY; 134 135 if (nbOfPendingRequests == 0) 136 consumerStatus = ConsumerStatus.CONSUMER_NO_ACTIVITY; 137 else if (nbOfPendingRequests > consumThreshold) 138 consumerStatus = ConsumerStatus.CONSUMER_HIGH_ACTIVITY; 139 else 140 consumerStatus = ConsumerStatus.CONSUMER_NORMAL_ACTIVITY; 141 } 142 143 146 private void updateThreshol() { 147 if (autoEvalThreshold) { 148 149 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 150 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 151 "LoadingFactor.updateThreshol before" + 152 " rateOfFlow=" + rateOfFlow + 153 ", producThreshold=" + producThreshold + 154 ", consumThreshold=" + consumThreshold ); 155 156 int deltaProd; 157 int deltaCons; 158 159 if (rateOfFlow < 1) { 160 deltaProd = (int) ((nbOfPendingMessages - producThreshold) * rateOfFlow); 161 deltaCons = (int) ((nbOfPendingRequests - consumThreshold) * rateOfFlow); 162 } else { 163 deltaProd = (int) ((nbOfPendingMessages - producThreshold) / rateOfFlow); 164 deltaCons = (int) ((nbOfPendingRequests - consumThreshold) / rateOfFlow); 165 } 166 167 if (nbOfPendingMessages > 0) { 168 if (deltaProd < producThreshold) 169 producThreshold = producThreshold + deltaProd; 170 else 171 producThreshold = deltaProd; 172 } 173 174 if (nbOfPendingRequests > 0) { 175 if (deltaCons < consumThreshold) 176 consumThreshold = consumThreshold + deltaCons; 177 else 178 consumThreshold = deltaCons; 179 } 180 181 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 182 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 183 "LoadingFactor.updateThreshol after" + 184 " rateOfFlow=" + rateOfFlow + 185 ", producThreshold=" + producThreshold + 186 ", consumThreshold=" + consumThreshold ); 187 } 188 } 189 190 202 public float evalRateOfFlow(int pendingMessages, 203 int pendingRequests) { 204 float currentROF; 205 nbOfPendingMessages = pendingMessages; 206 nbOfPendingRequests = pendingRequests; 207 208 if (pendingMessages == 0 && pendingRequests == 0) 209 currentROF = 1; 210 else if (pendingMessages == 0 && pendingRequests != 0) 211 currentROF = pendingRequests + 1; 212 else 213 currentROF = 214 new Float (pendingRequests).floatValue() / 215 new Float (pendingMessages).floatValue(); 216 217 rateOfFlow = (currentROF + rateOfFlow ) / 2; 218 219 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 220 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 221 "LoadingFactor.evalRateOfFlow" + 222 " pendingMessages = " + pendingMessages + 223 ", pendingRequests = " + pendingRequests + 224 ", rateOfFlow = " + rateOfFlow + 225 ", currentROF = " + currentROF); 226 227 return rateOfFlow; 228 } 229 230 239 public void factorCheck(Hashtable clusters, 240 int pendingMessages, 241 int pendingRequests) { 242 243 nbOfPendingMessages = pendingMessages; 244 nbOfPendingRequests = pendingRequests; 245 246 if (status == Status.WAIT && 247 statusTime < System.currentTimeMillis()) 248 status = Status.RUN; 249 250 251 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 252 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 253 ">> LoadingFactor.factorCheck " + 254 this + "\nclusters = " + clusters); 255 256 evalRateOfFlow(pendingMessages, 257 pendingRequests); 258 259 evalActivity(); 260 261 if ( status == Status.INIT || status == Status.RUN) { 262 if (isOverloaded()) { 263 dispatchAndSendTo(clusters, 264 pendingMessages, 265 pendingRequests); 266 status = Status.WAIT; 267 statusTime = System.currentTimeMillis() + validityPeriod; 268 } 269 } 270 271 updateThreshol(); 272 273 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 274 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 275 "<< LoadingFactor.factorCheck " 276 + this); 277 } 278 279 285 public boolean isOverloaded() { 286 overLoaded = false; 287 if ((consumerStatus == 288 ConsumerStatus.CONSUMER_HIGH_ACTIVITY) || 289 (producerStatus == 290 ProducerStatus.PRODUCER_HIGH_ACTIVITY)) 291 overLoaded = true; 292 293 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 294 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 295 "LoadingFactor.isOverloaded " 296 + overLoaded); 297 return overLoaded; 298 } 299 300 308 private void dispatchAndSendTo(Hashtable clusters, 309 int nbOfPendingMessages, 310 int nbOfPendingRequests) { 311 int nbMsgHope = -1; 312 int nbMsgGive = -1; 313 314 if ((consumerStatus == ConsumerStatus.CONSUMER_NO_ACTIVITY) && 315 (producerStatus == ProducerStatus.PRODUCER_NO_ACTIVITY)) 316 return; 317 318 if (producThreshold < nbOfPendingMessages) 319 nbMsgGive = nbOfPendingMessages - producThreshold; 320 321 if (consumThreshold < nbOfPendingRequests) 322 nbMsgHope = nbOfPendingRequests; 323 324 329 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 330 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 331 "LoadingFactor.dispatchAndSendTo" + 332 "\nnbMsgHope=" + nbMsgHope + 333 ", nbMsgGive=" + nbMsgGive); 334 335 336 if (consumerStatus == ConsumerStatus.CONSUMER_HIGH_ACTIVITY) 337 processHope(nbMsgHope,clusters); 338 339 if (producerStatus == ProducerStatus.PRODUCER_HIGH_ACTIVITY) 340 processGive(nbMsgGive,clusters); 341 } 342 343 349 private void processGive(int nbMsgGive, Hashtable clusters) { 350 if (nbMsgGive < 1) return; 351 352 Vector selected = new Vector (); 354 for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { 355 AgentId id = (AgentId) e.nextElement(); 356 if (((Float ) clusters.get(id)).floatValue() >= 1 && 357 !id.equals(clusterQueueImpl.destId)) 358 selected.add(id); 359 } 360 361 if (selected.size() == 0) return; 362 363 int nbGivePerQueue = nbMsgGive / selected.size(); 364 LBMessageGive msgGive = new LBMessageGive(validityPeriod, rateOfFlow); 365 366 if (nbGivePerQueue == 0 && nbMsgGive > 0) { 367 AgentId id = (AgentId) selected.get(0); 369 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 370 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 371 "LoadingFactor.processGive" + 372 " nbMsgGive = " + nbMsgGive + 373 ", id = " + id); 374 msgGive.setClientMessages(clusterQueueImpl.getClientMessages(nbMsgGive, null, true)); 375 clusterQueueImpl.forward(id,msgGive); 376 } else { 377 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 379 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 380 "LoadingFactor.processGive" + 381 " givePerQueue = " + nbGivePerQueue + 382 ", selected = " + selected); 383 for (Enumeration e = selected.elements(); e.hasMoreElements(); ) { 384 AgentId id = (AgentId) e.nextElement(); 385 msgGive.setClientMessages(clusterQueueImpl.getClientMessages(nbGivePerQueue, null, true)); 386 clusterQueueImpl.forward(id,msgGive); 387 } 388 } 389 } 390 391 397 private void processHope(int nbMsgHope, Hashtable clusters) { 398 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 399 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 400 "LoadingFactor.processHope" + 401 " nbMsgHope = " + nbMsgHope); 402 if (nbMsgHope < 1) return; 403 404 Vector selected = new Vector (); 405 for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { 406 AgentId id = (AgentId) e.nextElement(); 407 if (((Float ) clusters.get(id)).floatValue() <= 1 && 408 !id.equals(clusterQueueImpl.destId)) 409 selected.add(id); 410 } 411 412 if (selected.size() == 0) return; 413 414 int nbHopePerQueue = nbMsgHope / selected.size(); 415 if (nbHopePerQueue == 0 && nbMsgHope > 0) { 416 AgentId id = (AgentId) selected.get(0); 418 419 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 420 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 421 "LoadingFactor.processHope" + 422 " nbMsgHope = " + nbMsgHope + 423 ", id = " + id); 424 LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow); 425 msgHope.setNbMsg(nbMsgHope); 426 clusterQueueImpl.forward(id,msgHope); 427 428 } else { 429 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 431 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 432 "LoadingFactor.processHope" + 433 " hopePerQueue = " + nbHopePerQueue + 434 ", selected = " + selected); 435 436 LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow); 437 for (Enumeration e = selected.elements(); e.hasMoreElements(); ) { 438 AgentId id = (AgentId) e.nextElement(); 439 msgHope.setNbMsg(nbHopePerQueue); 440 clusterQueueImpl.forward(id,msgHope); 441 } 442 } 443 } 444 445 public String toString() { 446 StringBuffer str = new StringBuffer (); 447 str.append("LoadingFactor (status="); 448 str.append(Status.names[status]); 449 str.append(", consumerStatus="); 450 str.append(ConsumerStatus.names[consumerStatus]); 451 str.append(", producerStatus="); 452 str.append(ProducerStatus.names[producerStatus]); 453 str.append(", producThreshold="); 454 str.append(producThreshold); 455 str.append(", consumThreshold="); 456 str.append(consumThreshold); 457 str.append(", autoEvalThreshold="); 458 str.append(autoEvalThreshold); 459 str.append(", nbOfPendingMessages="); 460 str.append(nbOfPendingMessages); 461 str.append(", nbOfPendingRequests="); 462 str.append(nbOfPendingRequests); 463 str.append(", rateOfFlow="); 464 str.append(rateOfFlow); 465 str.append(", overLoaded="); 466 str.append(overLoaded); 467 str.append(")"); 468 return str.toString(); 469 } 470 } 471 | Popular Tags |