1 25 package org.objectweb.joram.mom.dest; 26 27 import java.util.Enumeration ; 28 import java.util.Hashtable ; 29 import java.util.Properties ; 30 import java.util.Vector ; 31 32 import org.objectweb.joram.mom.notifications.AdminReply; 33 import org.objectweb.joram.mom.notifications.ClientMessages; 34 import org.objectweb.joram.mom.notifications.ClusterRequest; 35 import org.objectweb.joram.mom.notifications.DestinationAdminRequestNot; 36 import org.objectweb.joram.mom.notifications.ExceptionReply; 37 import org.objectweb.joram.mom.notifications.Monit_GetCluster; 38 import org.objectweb.joram.mom.notifications.Monit_GetClusterRep; 39 import org.objectweb.joram.mom.notifications.Monit_GetFather; 40 import org.objectweb.joram.mom.notifications.Monit_GetFatherRep; 41 import org.objectweb.joram.mom.notifications.Monit_GetNumberRep; 42 import org.objectweb.joram.mom.notifications.Monit_GetSubscriptions; 43 import org.objectweb.joram.mom.notifications.SetFatherRequest; 44 import org.objectweb.joram.mom.notifications.SetRightRequest; 45 import org.objectweb.joram.mom.notifications.SubscribeReply; 46 import org.objectweb.joram.mom.notifications.SubscribeRequest; 47 import org.objectweb.joram.mom.notifications.TopicMsgsReply; 48 import org.objectweb.joram.mom.notifications.UnclusterRequest; 49 import org.objectweb.joram.mom.notifications.UnsetFatherRequest; 50 import org.objectweb.joram.mom.notifications.UnsubscribeRequest; 51 import org.objectweb.joram.shared.JoramTracing; 52 import org.objectweb.joram.shared.admin.GetSubscriberIds; 53 import org.objectweb.joram.shared.admin.GetSubscriberIdsRep; 54 import org.objectweb.joram.shared.excepts.AccessException; 55 import org.objectweb.joram.shared.excepts.MomException; 56 import org.objectweb.joram.shared.messages.Message; 57 import org.objectweb.joram.shared.selectors.Selector; 58 import org.objectweb.util.monolog.api.BasicLevel; 59 60 import fr.dyade.aaa.agent.AgentId; 61 import fr.dyade.aaa.agent.AgentServer; 62 import fr.dyade.aaa.agent.DeleteNot; 63 import fr.dyade.aaa.agent.Notification; 64 import fr.dyade.aaa.agent.UnknownAgent; 65 66 79 public class TopicImpl extends DestinationImpl implements TopicImplMBean { 80 81 protected AgentId fatherId = null; 82 83 protected Vector friends = null; 84 85 86 protected Vector subscribers; 87 88 protected Hashtable selectors; 89 90 91 protected transient boolean alreadySentLocally; 92 93 100 public TopicImpl(AgentId destId, AgentId adminId, Properties prop) { 101 super(destId, adminId, prop); 102 subscribers = new Vector (); 103 selectors = new Hashtable (); 104 } 105 106 109 public String toString() { 110 return "TopicImpl:" + destId.toString(); 111 } 112 113 120 public void clusterRequest(AgentId from, ClusterRequest req) throws AccessException 121 { 122 if (! isAdministrator(from)) 123 throw new AccessException("ADMIN right not granted"); 124 125 String info = null; 126 if (fatherId != null) { 127 info = strbuf.append("Request [").append(req.getClass().getName()) 128 .append("], sent to Topic [").append(destId) 129 .append("], successful [false]: topic part of a hierarchy").toString(); 130 strbuf.setLength(0); 131 forward(from, new AdminReply(req, false, info)); 132 return; 133 } 134 135 AgentId newFriendId = req.getTopicId(); 136 137 if (friends == null) { 138 setSave(); 140 friends = new Vector (); 141 } 142 143 if (friends.contains(newFriendId) || destId.equals(newFriendId)) { 144 info = strbuf.append("Request [").append(req.getClass().getName()) 145 .append("], sent to Topic [").append(destId) 146 .append("], successful [false]: joining topic already") 147 .append(" part of cluster").toString(); 148 strbuf.setLength(0); 149 forward(from, new AdminReply(req, false, info)); 150 return; 151 } 152 153 ClusterTest not = new ClusterTest(req, from); 154 forward(newFriendId, not); 155 } 156 157 162 public void clusterTest(AgentId from, ClusterTest not) { 163 String info = null; 164 if (friends != null && ! friends.isEmpty()) { 166 info = strbuf.append("Topic [").append(destId) 167 .append("] can't join cluster of topic [").append(from) 168 .append("] as it is already part of a cluster").toString(); 169 strbuf.setLength(0); 170 forward(from, new ClusterAck(not, false, info)); 171 } else if (fatherId != null) { 173 info = strbuf.append("Topic [").append(destId) 174 .append("] can't join cluster of topic [").append(from) 175 .append("] as it is already part of a hierarchy").toString(); 176 strbuf.setLength(0); 177 forward(from, new ClusterAck(not, false, info)); 178 } else { 180 setSave(); 182 friends = new Vector (); 183 friends.add(from); 184 info = strbuf.append("Topic [").append(destId) 185 .append("] ok for joining cluster of topic [").append(from) 186 .append(']').toString(); 187 strbuf.setLength(0); 188 forward(from, new ClusterAck(not, true, info)); 189 190 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 191 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic " 192 + destId.toString() + " joins cluster" 193 + "cluster of topic " + from.toString()); 194 } 195 } 196 197 201 public void clusterAck(AgentId from, ClusterAck ack){ 202 if (! ack.ok) { 204 forward(ack.requester, new AdminReply(ack.request, false, ack.info)); 205 return; 206 } 207 208 AgentId fellowId; 209 ClusterNot fellowNot; 210 ClusterNot newFriendNot = new ClusterNot(from); 211 for (int i = 0; i < friends.size(); i++) { 212 fellowId = (AgentId) friends.get(i); 213 fellowNot = new ClusterNot(fellowId); 214 forward(from, fellowNot); 216 forward(fellowId, newFriendNot); 218 } 219 setSave(); 221 friends.add(from); 222 223 String info = strbuf.append("Request [") 224 .append(ack.request.getClass().getName()) 225 .append("], sent to Topic [").append(destId) 226 .append("], successful [true]: topic [") 227 .append(from).append("] joined cluster").toString(); 228 strbuf.setLength(0); 229 forward(ack.requester, new AdminReply(ack.request, true, info)); 230 231 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 232 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 233 } 234 235 240 public void clusterNot(AgentId from, ClusterNot not) { 241 setSave(); 243 friends.add(not.topicId); 244 245 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 246 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic " 247 + not.topicId.toString() 248 + " set as a fellow."); 249 } 250 251 257 public void unclusterRequest(AgentId from, UnclusterRequest request) throws MomException { 258 if (! isAdministrator(from)) 259 throw new AccessException("ADMIN right not granted"); 260 261 if (friends == null || friends.isEmpty()) { 262 String info = strbuf.append("Request [") 263 .append(request.getClass().getName()) 264 .append("], sent to Topic [").append(destId) 265 .append("], successful [false]: topic not part of a cluster") 266 .toString(); 267 strbuf.setLength(0); 268 forward(from, new AdminReply(request, false, info)); 269 return; 270 } 271 272 UnclusterNot not = new UnclusterNot(); 273 AgentId fellowId; 274 while (! friends.isEmpty()) { 276 setSave(); 278 fellowId = (AgentId) friends.remove(0); 279 forward(fellowId, not); 280 } 281 friends = null; 282 283 String info = strbuf.append("Request [") 284 .append(request.getClass().getName()) 285 .append("], sent to Topic [").append(destId) 286 .append("], successful [true]: topic left the cluster").toString(); 287 strbuf.setLength(0); 288 forward(from, new AdminReply(request, true, info)); 289 290 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 291 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 292 } 293 294 298 public void unclusterNot(AgentId from, UnclusterNot not) { 299 setSave(); 301 friends.remove(from); 302 303 if (friends.isEmpty()) friends = null; 304 305 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 306 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic " 307 + from.toString() + " removed from" 308 + " cluster."); 309 } 310 311 317 public void setFatherRequest(AgentId from, SetFatherRequest request) throws MomException { 318 if (! isAdministrator(from)) 319 throw new AccessException("ADMIN right not granted"); 320 321 if (fatherId != null) { 322 strbuf.append("Request [").append(request.getClass().getName()) 323 .append("], sent to Topic [").append(destId) 324 .append("], successful [false]: topic already part of a hierarchy"); 325 forward(from, new AdminReply(request, false, strbuf.toString())); 326 strbuf.setLength(0); 327 return; 328 } 329 330 if (friends != null) { 331 strbuf.append("Request [").append(request.getClass().getName()) 332 .append("], sent to Topic [").append(destId) 333 .append("], successful [false]: topic already part of a cluster"); 334 forward(from, new AdminReply(request, false, strbuf.toString())); 335 strbuf.setLength(0); 336 return; 337 } 338 339 forward(request.getFatherId(), new FatherTest(request, from)); 340 } 341 342 346 public void fatherTest(AgentId from, FatherTest not) { 347 if (friends != null && ! friends.isEmpty()) { 348 strbuf.append("Topic [").append(destId) 349 .append("] can't accept topic [").append(from) 350 .append("] as a son as it is part of a cluster"); 351 forward(from, new FatherAck(not, false, strbuf.toString())); 352 strbuf.setLength(0); 353 } else { 354 strbuf.append("Topic [").append(destId) 355 .append("] accepts topic [").append(from).append("] as a son"); 356 forward(from, new FatherAck(not, true, strbuf.toString())); 357 strbuf.setLength(0); 358 } 359 } 360 361 365 public void fatherAck(AgentId from, FatherAck not) { 366 if (! not.ok) { 368 forward(not.requester, new AdminReply(not.request, false, not.info)); 369 return; 370 } 371 372 setSave(); 374 fatherId = from; 376 377 String info = strbuf.append("Request [") 378 .append(not.request.getClass().getName()) 379 .append("], sent to Topic [").append(destId) 380 .append("], successful [true]: topic [") 381 .append(from).append("] set as father").toString(); 382 strbuf.setLength(0); 383 forward(not.requester, new AdminReply(not.request, true, info)); 384 385 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 386 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 387 } 388 389 395 public void unsetFatherRequest(AgentId from, UnsetFatherRequest request) throws MomException { 396 if (! isAdministrator(from)) 397 throw new AccessException("ADMIN right not granted"); 398 399 String info = null; 400 if (fatherId == null) { 401 info = strbuf.append("Request [").append(request.getClass().getName()) 402 .append("], sent to Topic [").append(destId) 403 .append("], successful [false]: topic is not a son").toString(); 404 strbuf.setLength(0); 405 forward(from, new AdminReply(request, false, info)); 406 return; 407 } 408 409 setSave(); 411 fatherId = null; 412 413 info = strbuf.append("Request [").append(request.getClass().getName()) 414 .append("], sent to Topic [").append(destId) 415 .append("], successful [true]: father unset").toString(); 416 strbuf.setLength(0); 417 forward(from, new AdminReply(request, true, info)); 418 419 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 420 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, info); 421 } 422 423 430 public void monitGetSubscriptions(AgentId from, Monit_GetSubscriptions not) throws AccessException { 431 if (! isAdministrator(from)) 432 throw new AccessException("ADMIN right not granted"); 433 434 forward(from, new Monit_GetNumberRep(not, subscribers.size())); 435 } 436 437 443 public void monitGetFather(AgentId from, Monit_GetFather not) throws AccessException { 444 if (! isAdministrator(from)) 445 throw new AccessException("ADMIN right not granted"); 446 447 String id = null; 448 if (fatherId != null) 449 id = fatherId.toString(); 450 451 forward(from, new Monit_GetFatherRep(not, id)); 452 } 453 454 460 public void monitGetCluster(AgentId from, Monit_GetCluster not) throws AccessException { 461 if (! isAdministrator(from)) 462 throw new AccessException("ADMIN right not granted"); 463 464 Vector cluster = null; 465 if (friends != null) { 466 cluster = new Vector (); 467 for (int i = 0; i < friends.size(); i++) 468 cluster.add(friends.get(i).toString()); 469 cluster.add(destId.toString()); 470 } 471 472 forward(from, new Monit_GetClusterRep(not, cluster)); 473 } 474 475 public void preSubscribe(SubscribeRequest not) { 476 } 478 479 public void postSubscribe(SubscribeRequest not) { 480 } 482 483 489 public void subscribeRequest(AgentId from, SubscribeRequest not) throws AccessException { 490 if (! isReader(from)) 491 throw new AccessException("READ right not granted"); 492 493 preSubscribe(not); 494 495 if (! subscribers.contains(from)) { 497 setSave(); 499 subscribers.add(from); 500 } 501 502 setSave(); 504 505 if (not.getSelector() != null && ! not.getSelector().equals("")) 509 selectors.put(from, not.getSelector()); 510 else 511 selectors.remove(from); 512 513 forward(from, new SubscribeReply(not)); 514 515 postSubscribe(not); 516 517 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 518 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 519 "Client " + from 520 + " set as a subscriber with selector " 521 + not.getSelector()); 522 } 523 524 public void preUnsubscribe(UnsubscribeRequest not) { 525 } 527 528 public void postUnsubscribe(UnsubscribeRequest not) { 529 } 531 532 536 public void unsubscribeRequest(AgentId from, UnsubscribeRequest not) { 537 538 preUnsubscribe(not); 539 540 setSave(); 542 subscribers.remove(from); 543 selectors.remove(from); 544 545 postUnsubscribe(not); 546 547 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 548 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, 549 "Client " + from 550 + " removed from the subscribers."); 551 } 552 553 558 public void topicForwardNot(AgentId from, TopicForwardNot not) { 559 if (not.toFather && fatherId != null) { 561 forward(fatherId, not); 562 alreadySentLocally = fatherId.getTo() == AgentServer.getServerId(); 563 } 564 565 processMessages(not.messages); 567 } 568 569 public void destinationAdminRequestNot(AgentId from, DestinationAdminRequestNot not) { 570 org.objectweb.joram.shared.admin.AdminRequest adminRequest = 571 not.getRequest(); 572 if (adminRequest instanceof GetSubscriberIds) { 573 getSubscriberIds((GetSubscriberIds)adminRequest, 574 not.getReplyTo(), 575 not.getRequestMsgId(), 576 not.getReplyMsgId()); 577 } 578 } 579 580 private void getSubscriberIds(GetSubscriberIds request, 581 AgentId replyTo, 582 String requestMsgId, 583 String replyMsgId) { 584 GetSubscriberIdsRep reply = 585 new GetSubscriberIdsRep(getSubscriberIds()); 586 replyToTopic(reply, replyTo, requestMsgId, replyMsgId); 587 } 588 589 596 public String [] getSubscriberIds() { 597 String [] res = new String [subscribers.size()]; 598 for (int i = 0; i < res.length; i++) { 599 AgentId aid = (AgentId)subscribers.elementAt(i); 600 res[i] = aid.toString(); 601 } 602 return res; 603 } 604 605 611 protected void doRightRequest(SetRightRequest not) { 612 if (not.getRight() != -READ) 614 return; 615 616 SetRightRequest rightRequest = preProcess(not); 617 618 if (rightRequest != null) { 619 AgentId user = rightRequest.getClient(); 620 AccessException exc = new AccessException("READ right removed."); 621 622 if (user != null) { 624 setSave(); 626 subscribers.remove(user); 627 selectors.remove(user); 628 forward(user, new ExceptionReply(exc)); 629 } 630 else { 632 for (Enumeration subs = subscribers.elements(); 633 subs.hasMoreElements();) { 634 user = (AgentId) subs.nextElement(); 635 if (! isReader(user)) { 636 setSave(); 638 subscribers.remove(user); 639 selectors.remove(user); 640 forward(user, new ExceptionReply(exc)); 641 } 642 } 643 } 644 645 postProcess(rightRequest); 646 } 647 } 648 649 656 protected void doClientMessages(AgentId from, ClientMessages not) { 657 ClientMessages clientMsgs = preProcess(from, not); 658 659 if (clientMsgs != null) { 660 forwardMessages(clientMsgs); 662 processMessages(clientMsgs); 664 665 postProcess(clientMsgs); 666 } 667 } 668 669 677 protected void doUnknownAgent(UnknownAgent uA) { 678 AgentId agId = uA.agent; 679 Notification not = uA.not; 680 681 String info = null; 684 if (not instanceof ClusterTest) { 685 ClusterTest cT = (ClusterTest) not; 686 info = strbuf.append("Topic [").append(agId) 687 .append("] can't join cluster as it does not exist").toString(); 688 strbuf.setLength(0); 689 forward(cT.requester, new AdminReply(cT.request, false, info)); 690 } else if (not instanceof FatherTest) { 691 FatherTest fT = (FatherTest) not; 693 info = strbuf.append("Topic [").append(agId) 694 .append("] can't join hierarchy as it does not exist").toString(); 695 strbuf.setLength(0); 696 forward(fT.requester, new AdminReply(fT.request, false, info)); 697 } else { 698 setSave(); 700 subscribers.remove(agId); 702 selectors.remove(agId); 703 704 if (fatherId != null && agId.equals(fatherId)) { 706 setSave(); 708 fatherId = null; 709 } 710 } 711 } 712 713 721 protected void doDeleteNot(DeleteNot not) { 722 AgentId clientId; 723 Vector subs; 724 SubscribeRequest sub; 725 726 for (int i = 0; i < subscribers.size(); i++) { 728 clientId = (AgentId) subscribers.get(i); 729 forward(clientId, new UnknownAgent(destId, null)); 730 } 731 732 if (friends != null) { 734 AgentId topicId; 735 while (! friends.isEmpty()) { 736 setSave(); 738 topicId = (AgentId) friends.remove(0); 739 forward(topicId, new UnclusterNot()); 740 } 741 } 742 } 743 744 748 protected void forwardMessages(ClientMessages messages) 749 { 750 if (friends != null && ! friends.isEmpty()) { 751 AgentId topicId; 752 for (int i = 0; i < friends.size(); i++) { 753 topicId = (AgentId) friends.get(i); 754 forward(topicId, new TopicForwardNot(messages, false)); 755 756 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 757 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages " 758 + "forwarded to fellow " 759 + topicId.toString()); 760 } 761 } else if (fatherId != null) { 762 forward(fatherId, new TopicForwardNot(messages, true)); 763 764 if (JoramTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) 765 JoramTracing.dbgDestination.log(BasicLevel.DEBUG, "Messages " 766 + "forwarded to father " 767 + fatherId.toString()); 768 } 769 } 770 771 776 protected void processMessages(ClientMessages not) { 777 Vector messages = not.getMessages(); 778 AgentId subscriber; 779 boolean local; 780 String selector; 781 Vector deliverables; 782 Message message; 783 784 nbMsgsReceiveSinceCreation = nbMsgsReceiveSinceCreation + messages.size(); 785 786 setNoSave(); 787 boolean persistent = false; 788 789 for (Enumeration subs = subscribers.elements(); subs.hasMoreElements();) { 790 subscriber = (AgentId) subs.nextElement(); 792 local = (subscriber.getTo() == AgentServer.getServerId()); 793 selector = (String ) selectors.get(subscriber); 794 795 if (selector == null || selector.equals("")) { 796 if (! local) { 799 deliverables = messages; 802 persistent = true; 803 } else if (! alreadySentLocally) { 804 deliverables = messages; 805 alreadySentLocally = true; 806 } 807 else { 809 deliverables = new Vector (); 810 for (Enumeration msgs = messages.elements(); msgs.hasMoreElements();) 811 deliverables.add(((Message) msgs.nextElement()).clone()); 812 } 813 } else { 814 deliverables = new Vector (); 816 for (int i = 0; i < messages.size(); i++) { 817 message = (Message) messages.get(i); 818 819 if (Selector.matches(message, selector)) { 820 821 if (! local) { 824 deliverables.add(message); 825 persistent = true; 826 } else if (! alreadySentLocally) { 827 deliverables.add(message); 828 alreadySentLocally = true; 829 } 830 else 832 deliverables.add(message.clone()); 833 } 834 } 835 } 836 if (! deliverables.isEmpty()) { 838 TopicMsgsReply topicMsgsReply = new TopicMsgsReply(deliverables); 839 topicMsgsReply.setPersistent(persistent); 840 forward(subscriber, topicMsgsReply); 841 nbMsgsDeliverSinceCreation = nbMsgsDeliverSinceCreation + deliverables.size(); 842 } 843 } 844 } 845 846 public void setAlreadySentLocally(boolean alreadySentLocally) { 847 this.alreadySentLocally = alreadySentLocally; 848 } 849 } 850 | Popular Tags |