1 24 package org.objectweb.joram.mom.dest; 25 26 import java.io.IOException ; 27 import java.util.ArrayList ; 28 import java.util.Enumeration ; 29 import java.util.Hashtable ; 30 import java.util.List ; 31 import java.util.Properties ; 32 import java.util.Vector ; 33 34 import org.objectweb.joram.mom.messages.Message; 35 import org.objectweb.joram.mom.notifications.AckJoinQueueCluster; 36 import org.objectweb.joram.mom.notifications.ClientMessages; 37 import org.objectweb.joram.mom.notifications.JoinQueueCluster; 38 import org.objectweb.joram.mom.notifications.LBCycleLife; 39 import org.objectweb.joram.mom.notifications.LBMessageGive; 40 import org.objectweb.joram.mom.notifications.LBMessageHope; 41 import org.objectweb.joram.mom.notifications.LeaveQueueCluster; 42 import org.objectweb.joram.mom.notifications.QueueClusterNot; 43 import org.objectweb.joram.mom.notifications.ReceiveRequest; 44 import org.objectweb.joram.mom.notifications.SetRightQueueCluster; 45 import org.objectweb.joram.mom.notifications.SetRightRequest; 46 import org.objectweb.joram.mom.notifications.SpecialAdminRequest; 47 import org.objectweb.joram.mom.notifications.WakeUpNot; 48 import org.objectweb.joram.shared.JoramTracing; 49 import org.objectweb.joram.shared.admin.AddQueueCluster; 50 import org.objectweb.joram.shared.admin.ListClusterQueue; 51 import org.objectweb.joram.shared.admin.RemoveQueueCluster; 52 import org.objectweb.joram.shared.admin.SpecialAdmin; 53 import org.objectweb.joram.shared.excepts.RequestException; 54 import org.objectweb.util.monolog.api.BasicLevel; 55 56 import fr.dyade.aaa.agent.AgentId; 57 import fr.dyade.aaa.agent.UnknownNotificationException; 58 59 64 public class ClusterQueueImpl extends QueueImpl { 65 69 protected Hashtable clusters; 70 71 72 protected LoadingFactor loadingFactor; 73 74 78 private Hashtable timeTable; 79 80 83 private Hashtable visitTable; 84 85 86 private long clusterDeliveryCount; 87 88 89 private long waitAfterClusterReq = -1; 90 91 97 public ClusterQueueImpl(AgentId destId, AgentId adminId, Properties prop) { 98 super(destId, adminId, prop); 99 100 101 int producThreshold = -1; 102 103 int consumThreshold = -1; 104 105 boolean autoEvalThreshold = false; 106 107 if (prop != null) { 108 try { 109 waitAfterClusterReq = 110 Long.valueOf(prop.getProperty("waitAfterClusterReq")).longValue(); 111 } catch (NumberFormatException exc) { 112 waitAfterClusterReq = 60000; 113 } 114 try { 115 producThreshold = 116 Integer.valueOf(prop.getProperty("producThreshold")).intValue(); 117 } catch (NumberFormatException exc) { 118 producThreshold = 10000; 119 } 120 try { 121 consumThreshold = 122 Integer.valueOf(prop.getProperty("consumThreshold")).intValue(); 123 } catch (NumberFormatException exc) { 124 consumThreshold = 10000; 125 } 126 autoEvalThreshold = 127 Boolean.valueOf(prop.getProperty("autoEvalThreshold")).booleanValue(); 128 } 129 130 clusters = new Hashtable (); 131 clusters.put(destId, new Float (1)); 132 133 loadingFactor = new LoadingFactor(this, 134 producThreshold, 135 consumThreshold, 136 autoEvalThreshold, 137 waitAfterClusterReq); 138 timeTable = new Hashtable (); 139 visitTable = new Hashtable (); 140 clusterDeliveryCount = 0; 141 142 } 143 144 public String toString() { 145 return "ClusterQueueImpl:" + destId.toString(); 146 } 147 148 153 public void postProcess(SetRightRequest not) { 154 sendToCluster( 155 new SetRightQueueCluster( 156 loadingFactor.getRateOfFlow(), 157 not, 158 clients)); 159 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 160 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 161 "--- " + this + 162 " ClusterQueueImpl.postProcess(" + not + ")" + 163 "\nclients=" + clients); 164 } 165 166 171 public Object specialAdminProcess(SpecialAdminRequest not) 172 throws RequestException { 173 174 Object ret = null; 175 try { 176 SpecialAdmin req = not.getRequest(); 177 178 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 179 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 180 "--- " + this + 181 " specialAdminProcess : " + 182 req); 183 184 if (req instanceof AddQueueCluster) { 185 addQueueCluster(((AddQueueCluster) req).joiningQueue, 186 loadingFactor.getRateOfFlow()); 187 } else if (req instanceof RemoveQueueCluster) { 188 broadcastLeave(((RemoveQueueCluster) req).removeQueue); 189 removeQueueCluster(((RemoveQueueCluster) req).removeQueue); 190 } else if(req instanceof ListClusterQueue) { 191 ret = doList((ListClusterQueue) req); 192 } 193 } catch (Exception exc) { 194 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.WARN)) 195 JoramTracing.dbgDestination.log(BasicLevel.WARN, 196 "--- " + this + 197 " specialAdminProcess", 198 exc); 199 throw new RequestException(exc.getMessage()); 200 } 201 return ret; 202 } 203 204 210 protected Object doList(ListClusterQueue req) { 211 Vector vect = new Vector (); 212 for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) 213 vect.add(e.nextElement().toString()); 214 return vect; 215 } 216 217 223 protected void addQueueCluster(String joiningQueue, float rateOfFlow) { 224 AgentId id = AgentId.fromString(joiningQueue); 225 if (clusters.containsKey(id)) return; 226 227 229 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 230 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 231 "--- " + this + 232 " ClusterQueueImpl.addQueueCluster in " + destId + 233 "\njoiningQueue=" + joiningQueue + 234 "\nclusters=" + clusters); 235 236 forward(id, 237 new JoinQueueCluster(loadingFactor.getRateOfFlow(), 238 clusters, 239 clients, 240 freeReading, 241 freeWriting)); 242 } 243 244 249 protected void broadcastLeave(String removeQueue) { 250 sendToCluster(new LeaveQueueCluster(removeQueue)); 251 } 252 253 258 public void removeQueueCluster(String removeQueue) { 259 AgentId id = AgentId.fromString(removeQueue); 260 if (destId.equals(id)) { 261 clusters.clear(); 262 } else 263 clusters.remove(id); 264 265 for (Enumeration e = visitTable.elements(); e.hasMoreElements(); ) { 266 Vector visit = (Vector ) e.nextElement(); 267 if (visit.contains(id)) 268 visit.remove(id); 269 } 270 271 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 272 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 273 "--- " + this + 274 " ClusterQueueImpl.removeQueueCluster in " + destId + 275 "\nremoveQueue=" + removeQueue + 276 "\nclusters=" + clusters); 277 } 278 279 286 public ClientMessages preProcess(AgentId from, ClientMessages not) { 287 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 288 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 289 "--- " + this + 290 " " + not); 291 receiving = true; 292 long date = System.currentTimeMillis(); 293 294 Message msg; 295 for (Enumeration msgs = not.getMessages().elements(); 297 msgs.hasMoreElements();) { 298 msg = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement()); 299 msg.order = arrivalsCounter++; 300 storeMsgIdInTimeTable(msg.getIdentifier(), 301 new Long (date)); 302 } 304 return not; 305 } 306 307 313 public void postProcess(ClientMessages not) { 314 if (getMessageCounter() > loadingFactor.producThreshold) 315 loadingFactor.factorCheck(clusters, 316 getMessageCounter(), 317 getWaitingRequestCount()); 318 else 319 loadingFactor.evalRateOfFlow(getMessageCounter(), 320 getWaitingRequestCount()); 321 receiving = false; 322 } 323 324 325 326 331 public void setRightQueueCluster(SetRightQueueCluster not) { 332 try { 333 AgentId user = not.setRightRequest.getClient(); 334 int right = not.setRightRequest.getRight(); 335 super.processSetRight(user,right); 336 } catch (RequestException exc) {} 337 super.doRightRequest(not.setRightRequest); 338 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 339 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 340 "--- " + this + 341 " ClusterQueueImpl.setRightQueueCluster(" + not + ")" + 342 "\nclients=" + clients); 343 } 344 345 352 public void wakeUpNot(WakeUpNot not) { 353 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 354 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 355 "--- " + this + 356 " ClusterQueueImpl.wakeUpNot(" + not + ")"); 357 super.wakeUpNot(not); 358 359 if (clusters.size() > 1) 360 loadingFactor.factorCheck(clusters, 361 getMessageCounter(), 362 getWaitingRequestCount()); 363 364 List toGive = new ArrayList (); 367 long oldTime = System.currentTimeMillis() - period; 368 for (Enumeration e = timeTable.keys(); e.hasMoreElements(); ) { 369 String msgId = (String ) e.nextElement(); 370 if (((Long ) timeTable.get(msgId)).longValue() < oldTime) { 371 toGive.add(msgId); 372 storeMsgIdInVisitTable(msgId,destId); 373 } 374 } 375 376 if (toGive.isEmpty()) return; 377 378 Hashtable table = new Hashtable (); 379 for (int i = 0; i < toGive.size(); i++) { 380 String msgId = (String ) toGive.get(i); 381 Vector visit = (Vector ) visitTable.get(msgId); 382 for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { 383 AgentId id = (AgentId) e.nextElement(); 384 if (! visit.contains(id)) { 385 Message message = getMessage(msgId, true); 386 if (message != null) { 387 LBCycleLife cycle = (LBCycleLife) table.get(id); 388 if (cycle == null) { 389 cycle = new LBCycleLife(loadingFactor.getRateOfFlow()); 390 cycle.setClientMessages(new ClientMessages()); 391 } 392 ClientMessages cm = cycle.getClientMessages(); 393 cm.addMessage(message.msg); 394 cycle.putInVisitTable(msgId,visit); 395 table.put(id,cycle); 396 break; 397 } 398 } 399 } 400 } 401 402 for (Enumeration e = table.keys(); e.hasMoreElements(); ) { 403 AgentId id = (AgentId) e.nextElement(); 404 forward(id,(LBCycleLife) table.get(id)); 405 } 406 } 407 408 416 public void lBCycleLife(AgentId from, LBCycleLife not) { 417 418 clusters.put(from,new Float (not.getRateOfFlow())); 419 420 Hashtable vT = not.getVisitTable(); 421 for (Enumeration e = vT.keys(); e.hasMoreElements(); ) { 422 String msgId = (String ) e.nextElement(); 423 visitTable.put(msgId,vT.get(msgId)); 424 } 425 426 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 427 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 428 "--- " + this + 429 " ClusterQueueImpl.lBCycleLife(" + not + ")" + 430 "\nvisitTable=" + clusters); 431 ClientMessages cm = not.getClientMessages(); 432 if (cm != null) 433 doClientMessages(from, cm); 434 } 435 436 442 public void joinQueueCluster(JoinQueueCluster not) { 443 for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) { 444 AgentId id = (AgentId) e.nextElement(); 445 if (! clusters.containsKey(id)) 446 clusters.put(id,not.clusters.get(id)); 447 } 448 for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) { 449 AgentId user = (AgentId) e.nextElement(); 450 if (clients.containsKey(user)) { 451 Integer right = (Integer ) not.clients.get(user); 452 if (right.compareTo((Integer ) clients.get(user)) > 0) 453 clients.put(user,right); 454 } else 455 clients.put(user,not.clients.get(user)); 456 } 457 458 freeReading = freeReading | not.freeReading; 459 freeWriting = freeWriting | not.freeWriting; 460 461 sendToCluster( 462 new AckJoinQueueCluster(loadingFactor.getRateOfFlow(), 463 clusters, 464 clients, 465 freeReading, 466 freeWriting)); 467 468 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 469 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 470 "--- " + this + 471 " ClusterQueueImpl.joinQueueCluster(" + not + ")" + 472 "\nclusters=" + clusters + 473 "\nclients=" + clients); 474 } 475 476 480 public void ackJoinQueueCluster(AckJoinQueueCluster not) { 481 for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) { 482 AgentId id = (AgentId) e.nextElement(); 483 if (! clusters.containsKey(id)) 484 clusters.put(id,not.clusters.get(id)); 485 } 486 for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) { 487 AgentId user = (AgentId) e.nextElement(); 488 if (clients.containsKey(user)) { 489 Integer right = (Integer ) not.clients.get(user); 490 if (right.compareTo((Integer ) clients.get(user)) > 0) 491 clients.put(user,right); 492 } else 493 clients.put(user,not.clients.get(user)); 494 } 495 496 freeReading = freeReading | not.freeReading; 497 freeWriting = freeWriting | not.freeWriting; 498 499 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 500 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 501 "--- " + this + 502 " ClusterQueueImpl.ackJoinQueueCluster(" + not + ")" + 503 "\nclusters=" + clusters + 504 "\nclients=" + clients); 505 } 506 507 511 public void receiveRequest(ReceiveRequest not) { 512 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 513 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 514 "--- " + this + 515 " ClusterQueueImpl.receiveRequest(" + not + ")"); 516 517 519 if (getWaitingRequestCount() > loadingFactor.consumThreshold) 520 loadingFactor.factorCheck(clusters, 521 getMessageCounter(), 522 getWaitingRequestCount()); 523 } 524 525 533 public void lBMessageGive(AgentId from, LBMessageGive not) 534 throws UnknownNotificationException { 535 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 536 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 537 "--- " + this + 538 " ClusterQueueImpl.lBMessageGive(" + from + "," + not + ")"); 539 540 clusters.put(from,new Float (not.getRateOfFlow())); 541 542 ClientMessages cm = not.getClientMessages(); 543 if (cm != null) 544 doClientMessages(from, cm); 545 } 546 547 553 public void lBMessageHope(AgentId from, LBMessageHope not) { 554 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 555 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 556 "--- " + this + 557 " ClusterQueueImpl.lBMessageHope(" + from + "," + not + ")"); 558 559 clusters.put(from,new Float (not.getRateOfFlow())); 560 561 int hope = not.getNbMsg(); 562 563 long current = System.currentTimeMillis(); 564 ClientMessages deadMessages = cleanPendingMessage(current); 566 if (deadMessages != null) 568 sendToDMQ(deadMessages, null); 569 570 if (loadingFactor.getRateOfFlow() < 1) { 571 int possibleGive = getMessageCounter() - getWaitingRequestCount(); 572 LBMessageGive msgGive = 573 new LBMessageGive(waitAfterClusterReq,loadingFactor.getRateOfFlow()); 574 575 ClientMessages cm = null; 577 if (possibleGive > hope) { 578 cm = getClientMessages(hope, null, true); 579 } else { 580 cm = getClientMessages(possibleGive, null, true); 581 } 582 583 msgGive.setClientMessages(cm); 584 msgGive.setRateOfFlow( 585 loadingFactor.evalRateOfFlow(getMessageCounter(), getWaitingRequestCount())); 586 587 forward(from, msgGive); 589 590 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 591 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 592 "--- " + this + 593 " ClusterQueueImpl.lBMessageHope LBMessageHope : nbMsgSend = " + 594 cm.getMessages().size()); 595 } 596 } 597 598 607 protected ClientMessages getClientMessages(int nb, String selector, boolean remove) { 608 ClientMessages cm = super.getClientMessages(nb, selector, remove); 609 for (Enumeration e = cm.getMessages().elements(); e.hasMoreElements(); ) { 611 org.objectweb.joram.shared.messages.Message message = 612 (org.objectweb.joram.shared.messages.Message) e.nextElement(); 613 monitoringMsgSendToCluster(message.id); 614 } 615 return cm; 616 } 617 618 626 protected Message getMessage(String msgId, boolean remove) { 627 Message msg = super.getMessage(msgId, remove); 628 if (msg != null) { 629 monitoringMsgSendToCluster(msg.getIdentifier()); 630 } 631 return msg; 632 } 633 634 639 protected void sendToCluster(QueueClusterNot not) { 640 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 641 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 642 "--- " + this + 643 " ClusterQueueImpl.sendToCluster(" + not + ")"); 644 645 if (clusters.size() < 2) return; 646 647 for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { 648 AgentId id = (AgentId) e.nextElement(); 649 if (! id.equals(destId)) 650 forward(id,not); 651 } 652 } 653 654 657 public long getClusterDeliveryCount() { 658 return clusterDeliveryCount; 659 } 660 661 666 private void storeMsgIdInTimeTable(String msgId, Long date) { 667 try { 668 timeTable.put(msgId,date); 669 } catch (NullPointerException exc) {} 670 } 671 672 677 private void storeMsgIdInVisitTable(String msgId, AgentId destId) { 678 Vector alreadyVisit = (Vector ) visitTable.get(msgId); 679 if (alreadyVisit == null) alreadyVisit = new Vector (); 680 alreadyVisit.add(destId); 681 visitTable.put(msgId,alreadyVisit); 682 } 683 684 688 protected void messageDelivered(String msgId) { 689 timeTable.remove(msgId); 690 visitTable.remove(msgId); 691 } 692 693 697 protected void monitoringMsgSendToCluster(String msgId) { 698 timeTable.remove(msgId); 699 visitTable.remove(msgId); 700 clusterDeliveryCount++; 701 } 702 703 707 public void setWaitAfterClusterReq(long waitAfterClusterReq) { 708 this.waitAfterClusterReq = waitAfterClusterReq; 709 loadingFactor.validityPeriod = waitAfterClusterReq; 710 } 711 712 716 public void setProducThreshold(int producThreshold) { 717 loadingFactor.producThreshold = producThreshold; 718 } 719 720 724 public void setConsumThreshold(int consumThreshold) { 725 loadingFactor.consumThreshold = consumThreshold; 726 } 727 728 732 public void setAutoEvalThreshold(boolean autoEvalThreshold) { 733 loadingFactor.autoEvalThreshold = autoEvalThreshold; 734 } 735 736 742 private void readObject(java.io.ObjectInputStream in) 743 throws IOException , ClassNotFoundException { 744 745 in.defaultReadObject(); 746 747 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 748 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 749 "--- " + this + 750 " ClusterQueueImpl.readObject" + 751 " loadingFactor = " + loadingFactor); 752 } 753 } 754 | Popular Tags |