1 26 package org.objectweb.joram.mom.proxies; 27 28 import java.io.IOException ; 29 import java.io.ObjectInputStream ; 30 import java.io.ObjectOutputStream ; 31 32 import java.util.Enumeration ; 33 import java.util.Hashtable ; 34 import java.util.Vector ; 35 36 import fr.dyade.aaa.agent.AgentId; 37 import fr.dyade.aaa.agent.DeleteNot; 38 import fr.dyade.aaa.agent.Notification; 39 import fr.dyade.aaa.agent.UnknownAgent; 40 import fr.dyade.aaa.agent.UnknownNotificationException; 41 import fr.dyade.aaa.agent.Channel; 42 43 import org.objectweb.joram.mom.dest.*; 44 import org.objectweb.joram.mom.notifications.*; 45 import org.objectweb.joram.mom.messages.Message; 46 47 import org.objectweb.joram.shared.admin.DeleteUser; 48 import org.objectweb.joram.shared.admin.UpdateUser; 49 50 import org.objectweb.joram.shared.admin.GetSubscriptions; 51 import org.objectweb.joram.shared.admin.GetSubscriptionsRep; 52 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIds; 53 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIdsRep; 54 import org.objectweb.joram.shared.admin.GetSubscriptionMessage; 55 import org.objectweb.joram.shared.admin.GetSubscriptionMessageRep; 56 import org.objectweb.joram.shared.admin.DeleteSubscriptionMessage; 57 import org.objectweb.joram.shared.admin.GetSubscription; 58 import org.objectweb.joram.shared.admin.GetSubscriptionRep; 59 import org.objectweb.joram.shared.admin.ClearSubscription; 60 61 import org.objectweb.joram.shared.client.*; 62 import org.objectweb.joram.shared.excepts.*; 63 64 import javax.management.openmbean.CompositeDataSupport ; 65 66 import fr.dyade.aaa.util.Debug; 67 import org.objectweb.util.monolog.api.BasicLevel; 68 import org.objectweb.util.monolog.api.Logger; 69 70 75 public class ProxyImpl implements java.io.Serializable , ProxyImplMBean { 76 public static Logger logger = Debug.getLogger(ProxyImpl.class.getName()); 77 78 79 protected long period = 60000L; 80 81 86 public long getPeriod() { 87 return period; 88 } 89 90 96 public void setPeriod(long period) { 97 if ((this.period == -1L) && (period != -1L)) { 98 Channel.sendTo(proxyAgent.getId(), new WakeUpNot()); 100 } 101 this.period = period; 102 } 103 104 108 private AgentId dmqId = null; 109 113 private Integer threshold = null; 114 115 121 private Hashtable contexts; 122 128 private Hashtable subsTable; 129 135 private Hashtable recoveredTransactions; 136 137 138 private long arrivalsCounter = 0; 139 140 141 private ProxyAgentItf proxyAgent; 142 148 private transient Hashtable topicsTable; 149 155 private transient Hashtable messagesTable; 156 157 162 private transient int activeCtxId; 163 164 private transient ClientContext activeCtx; 165 166 169 public ProxyImpl(ProxyAgentItf proxyAgent) { 170 contexts = new Hashtable (); 171 subsTable = new Hashtable (); 172 this.proxyAgent = proxyAgent; 173 if (logger.isLoggable(BasicLevel.DEBUG)) 174 logger.log(BasicLevel.DEBUG, this + ": created."); 175 } 176 177 180 public String toString() { 181 if (proxyAgent == null) 182 return "ProxyImpl:"; 183 else 184 return "ProxyImpl:" + proxyAgent.getId(); 185 } 186 187 188 194 public void initialize(boolean firstTime) throws Exception { 195 if (logger.isLoggable(BasicLevel.DEBUG)) 196 logger.log(BasicLevel.DEBUG, "--- " + this + " (re)initializing..."); 197 198 topicsTable = new Hashtable (); 199 messagesTable = new Hashtable (); 200 201 setActiveCtxId(-1); 202 203 205 ClientContext activeCtx; 207 AgentId destId; 208 for (Enumeration ctxIds = contexts.keys(); ctxIds.hasMoreElements();) { 209 activeCtx = (ClientContext) contexts.remove(ctxIds.nextElement()); 210 211 for (Enumeration queueIds = activeCtx.getDeliveringQueues(); 213 queueIds.hasMoreElements();) { 214 destId = (AgentId) queueIds.nextElement(); 215 proxyAgent.sendNot(destId, new DenyRequest(activeCtx.getId())); 216 217 if (logger.isLoggable(BasicLevel.DEBUG)) 218 logger.log(BasicLevel.DEBUG, 219 "Denies messages on queue " + destId.toString()); 220 } 221 222 Enumeration xids = activeCtx.getTxIds(); 224 Xid xid; 225 XACnxPrepare recoveredPrepare; 226 XACnxPrepare prepare; 227 while (xids.hasMoreElements()) { 228 if (recoveredTransactions == null) 229 recoveredTransactions = new Hashtable (); 230 231 xid = (Xid) xids.nextElement(); 232 233 recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid); 234 prepare = activeCtx.getTxPrepare(xid); 235 236 if (recoveredPrepare == null) 237 recoveredTransactions.put(xid, prepare); 238 else { 239 recoveredPrepare.getSendings().addAll(prepare.getSendings()); 240 recoveredPrepare.getAcks().addAll(prepare.getAcks()); 241 } 242 } 243 244 for (Enumeration tempDests = activeCtx.getTempDestinations(); 246 tempDests.hasMoreElements();) { 247 destId = (AgentId) tempDests.nextElement(); 248 deleteTemporaryDestination(destId); 249 250 if (logger.isLoggable(BasicLevel.DEBUG)) 251 logger.log(BasicLevel.DEBUG, 252 "Deletes temporary destination " + destId.toString()); 253 } 254 } 255 256 Vector messages = Message.loadAll(getMsgTxname()); 258 259 if (subsTable.isEmpty()) { 260 Message.deleteAll(getMsgTxname()); 263 } 264 265 String subName; 267 ClientSubscription cSub; 268 Vector topics = new Vector (); 269 TopicSubscription tSub; 270 for (Enumeration subNames = subsTable.keys(); 271 subNames.hasMoreElements();) { 272 subName = (String ) subNames.nextElement(); 273 cSub = (ClientSubscription) subsTable.get(subName); 274 destId = cSub.getTopicId(); 275 if (! topics.contains(destId)) 276 topics.add(destId); 277 if (! cSub.getDurable()) 279 subsTable.remove(subName); 280 else { 282 cSub.setProxyAgent(proxyAgent); 283 cSub.reinitialize(getStringId(), 284 messagesTable, 285 messages, 286 true); 287 tSub = (TopicSubscription) topicsTable.get(destId); 288 if (tSub == null) { 289 tSub = new TopicSubscription(); 290 topicsTable.put(destId, tSub); 291 } 292 tSub.putSubscription(subName, cSub.getSelector()); 293 } 294 } 295 for (Enumeration topicIds = topics.elements(); 297 topicIds.hasMoreElements();) 298 updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1); 299 } 300 301 private void setActiveCtxId(int activeCtxId) { 302 if (logger.isLoggable(BasicLevel.DEBUG)) 303 logger.log(BasicLevel.DEBUG, 304 "ProxyImpl.setActiveCtxId(" + activeCtxId + ')'); 305 this.activeCtxId = activeCtxId; 306 } 307 308 317 public void reactToClientRequest(int key, AbstractJmsRequest request) 318 { 319 try { 320 if (logger.isLoggable(BasicLevel.DEBUG)) 321 logger.log(BasicLevel.DEBUG, 322 "--- " + this + " got " + request.getClass().getName() + 323 " with id: " + request.getRequestId() + 324 " through activeCtx: " + key); 325 326 if (request instanceof ProducerMessages) 327 reactToClientRequest(key, (ProducerMessages) request); 328 else if (request instanceof ConsumerReceiveRequest) 329 reactToClientRequest(key, (ConsumerReceiveRequest) request); 330 else if (request instanceof ConsumerSetListRequest) 331 reactToClientRequest(key, (ConsumerSetListRequest) request); 332 else if (request instanceof QBrowseRequest) 333 reactToClientRequest(key, (QBrowseRequest) request); 334 else if (request instanceof JmsRequestGroup) 335 reactToClientRequest(key, (JmsRequestGroup) request); 336 else { 337 doReact(key, request); 338 } 339 } 340 catch (IllegalArgumentException iE) { 343 DestinationException dE = 344 new DestinationException("Proxy could not forward the request to" 345 + " incorrectly identified destination: " 346 + iE); 347 348 doReply(key, new MomExceptionReply(request.getRequestId(), dE)); 349 } 350 } 351 352 358 private void reactToClientRequest(int key, ProducerMessages req) { 359 if (logger.isLoggable(BasicLevel.DEBUG)) 360 logger.log(BasicLevel.DEBUG, 361 "ProxyImpl.reactToClientRequest(" + key + ',' + req + ')'); 362 363 AgentId destId = AgentId.fromString(req.getTarget()); 364 ClientMessages not = new ClientMessages( 365 key, 366 req.getRequestId(), 367 req.getMessages()); 368 369 setDmq(not); 370 371 372 if (destId.getTo() == proxyAgent.getId().getTo()) { 373 if (logger.isLoggable(BasicLevel.DEBUG)) 374 logger.log(BasicLevel.DEBUG, " -> local sending"); 375 not.setPersistent(false); 376 if (req.getAsyncSend()) { 377 not.setAsyncSend(true); 378 } 379 } else { 380 if (logger.isLoggable(BasicLevel.DEBUG)) 381 logger.log(BasicLevel.DEBUG, " -> remote sending"); 382 if (!req.getAsyncSend()) { 383 proxyAgent.sendNot(proxyAgent.getId(), 384 new SendReplyNot(key, req.getRequestId())); 385 } 386 } 387 388 proxyAgent.sendNot(destId, not); 389 } 390 391 private void setDmq(ClientMessages not) { 392 if (dmqId != null) { 394 not.setDMQId(dmqId); 395 } else { 396 not.setDMQId(DeadMQueueImpl.getId()); 397 } 398 } 399 400 405 private void reactToClientRequest(int key, ConsumerReceiveRequest req) 406 { 407 if (req.getQueueMode()) { 408 ReceiveRequest not = new ReceiveRequest( 409 key, 410 req.getRequestId(), 411 req.getSelector(), 412 req.getTimeToLive(), 413 req.getReceiveAck(), 414 null, 415 1); 416 AgentId to = AgentId.fromString(req.getTarget()); 417 if (to.getTo() == proxyAgent.getId().getTo()) { 418 if (logger.isLoggable(BasicLevel.DEBUG)) 419 logger.log(BasicLevel.DEBUG, " -> local receiving"); 420 not.setPersistent(false); 421 proxyAgent.sendNot(to, not); 422 } else { 423 proxyAgent.sendNot(to, not); 424 } 425 } else { 426 doReact(key, req); 427 } 428 } 429 430 435 private void reactToClientRequest(int key, ConsumerSetListRequest req) { 436 if (logger.isLoggable(BasicLevel.DEBUG)) 437 logger.log(BasicLevel.DEBUG, 438 "ProxyImp.reactToClientRequest(" + key + ',' + req + ')'); 439 if (req.getQueueMode()) { 440 ReceiveRequest not = new ReceiveRequest(key, 441 req.getRequestId(), 442 req.getSelector(), 443 0, 444 false, 445 req.getMessageIdsToAck(), 446 req.getMessageCount()); 447 AgentId to = AgentId.fromString(req.getTarget()); 448 if (to.getTo() == proxyAgent.getId().getTo()) { 449 if (logger.isLoggable(BasicLevel.DEBUG)) 450 logger.log(BasicLevel.DEBUG, " -> local sending"); 451 not.setPersistent(false); 452 proxyAgent.sendNot(to, not); 453 } else { 454 proxyAgent.sendNot(to, not); 455 } 456 } 457 else { 458 doReact(key, req); 459 } 460 } 461 462 466 private void reactToClientRequest(int key, QBrowseRequest req) 467 { 468 proxyAgent.sendNot(AgentId.fromString(req.getTarget()), 469 new BrowseRequest(key, 470 req.getRequestId(), 471 req.getSelector())); 472 } 473 474 private void reactToClientRequest(int key, JmsRequestGroup request) { 475 AbstractJmsRequest[] requests = request.getRequests(); 476 RequestBuffer rm = new RequestBuffer(proxyAgent); 477 for (int i = 0; i < requests.length; i++) { 478 if (requests[i] instanceof ProducerMessages) { 479 ProducerMessages pm =(ProducerMessages) requests[i]; 480 rm.put(key, pm); 481 } else { 482 reactToClientRequest(key, requests[i]); 483 } 484 } 485 486 rm.flush(); 487 } 488 489 508 public void react(AgentId from, Notification not) 509 throws UnknownNotificationException 510 { 511 if (not instanceof SetDMQRequest) 513 doReact(from, (SetDMQRequest) not); 514 else if (not instanceof SetThreshRequest) 515 doReact(from, (SetThreshRequest) not); 516 else if (not instanceof SetNbMaxMsgRequest) 517 doReact(from, (SetNbMaxMsgRequest) not); 518 else if (not instanceof Monit_GetNbMaxMsg) 519 doReact(from, (Monit_GetNbMaxMsg) not); 520 else if (not instanceof Monit_GetDMQSettings) 521 doReact(from, (Monit_GetDMQSettings) not); 522 else if (not instanceof SyncReply) 524 doReact((SyncReply) not); 525 else if (not instanceof AbstractReply) 527 doFwd(from, (AbstractReply) not); 528 else if (not instanceof AdminReply) 529 doReact((AdminReply) not); 530 else if (not instanceof UnknownAgent) 532 doReact((UnknownAgent) not); 533 else if (not instanceof UserAdminRequestNot) 534 doReact((UserAdminRequestNot) not); 535 else 536 throw new UnknownNotificationException("Unexpected notification: " 537 + not.getClass().getName()); 538 } 539 540 541 572 private void doReact(int key, AbstractJmsRequest request) 573 { 574 try { 575 if (! (request instanceof CnxConnectRequest)) 578 setCtx(key); 579 580 if (request instanceof GetAdminTopicRequest) 581 doReact(key, (GetAdminTopicRequest) request); 582 else if (request instanceof CnxConnectRequest) 583 doReact(key, (CnxConnectRequest) request); 584 else if (request instanceof CnxStartRequest) 585 doReact((CnxStartRequest) request); 586 else if (request instanceof CnxStopRequest) 587 doReact((CnxStopRequest) request); 588 else if (request instanceof SessCreateTQRequest) 589 doReact((SessCreateTQRequest) request); 590 else if (request instanceof SessCreateTTRequest) 591 doReact((SessCreateTTRequest) request); 592 else if (request instanceof ConsumerSubRequest) 593 doReact((ConsumerSubRequest) request); 594 else if (request instanceof ConsumerUnsubRequest) 595 doReact((ConsumerUnsubRequest) request); 596 else if (request instanceof ConsumerCloseSubRequest) 597 doReact((ConsumerCloseSubRequest) request); 598 else if (request instanceof ConsumerSetListRequest) 599 doReact((ConsumerSetListRequest) request); 600 else if (request instanceof ConsumerUnsetListRequest) 601 doReact((ConsumerUnsetListRequest) request); 602 else if (request instanceof ConsumerReceiveRequest) 603 doReact((ConsumerReceiveRequest) request); 604 else if (request instanceof ConsumerAckRequest) 605 doReact((ConsumerAckRequest) request); 606 else if (request instanceof ConsumerDenyRequest) 607 doReact((ConsumerDenyRequest) request); 608 else if (request instanceof SessAckRequest) 609 doReact((SessAckRequest) request); 610 else if (request instanceof SessDenyRequest) 611 doReact((SessDenyRequest) request); 612 else if (request instanceof TempDestDeleteRequest) 613 doReact((TempDestDeleteRequest) request); 614 else if (request instanceof XACnxPrepare) 615 doReact((XACnxPrepare) request); 616 else if (request instanceof XACnxCommit) 617 doReact((XACnxCommit) request); 618 else if (request instanceof XACnxRollback) 619 doReact((XACnxRollback) request); 620 else if (request instanceof XACnxRecoverRequest) 621 doReact((XACnxRecoverRequest) request); 622 else if (request instanceof CnxCloseRequest) 623 doReact(key, (CnxCloseRequest) request); 624 else if (request instanceof ActivateConsumerRequest) 625 doReact(key, (ActivateConsumerRequest) request); 626 else if (request instanceof CommitRequest) 627 doReact(key, (CommitRequest)request); 628 } 629 catch (MomException mE) { 630 if (logger.isLoggable(BasicLevel.ERROR)) 631 logger.log(BasicLevel.ERROR, mE); 632 633 doReply(new MomExceptionReply(request.getRequestId(), mE)); 635 } 636 } 637 638 647 private void doReact(int key, GetAdminTopicRequest req) 648 throws AccessException 649 { 650 653 doReply( 654 key, 655 new GetAdminTopicReply( 656 req, 657 AdminTopicImpl.getReference().getId().toString())); 658 } 659 660 671 private void doReact(int key, CnxConnectRequest req) 672 throws DestinationException { 673 proxyAgent.setSave(); 675 676 setActiveCtxId(key); 677 activeCtx = new ClientContext(proxyAgent.getId(), key); 678 activeCtx.setProxyAgent(proxyAgent); 679 contexts.put(new Integer (key), activeCtx); 680 681 if (logger.isLoggable(BasicLevel.DEBUG)) 682 logger.log(BasicLevel.DEBUG, "Connection " + key + " opened."); 683 684 doReply(new CnxConnectReply(req, key, proxyAgent.getId().toString())); 685 } 686 687 694 private void doReact(CnxStartRequest req) { 695 activeCtx.setActivated(true); 696 697 for (Enumeration deliveries = activeCtx.getPendingDeliveries(); 699 deliveries.hasMoreElements();) 700 doReply((AbstractJmsReply) deliveries.nextElement()); 701 702 activeCtx.clearPendingDeliveries(); 704 } 705 706 712 private void doReact(CnxStopRequest req) { 713 activeCtx.setActivated(false); 714 doReply(new ServerReply(req)); 715 } 716 717 729 private void doReact(SessCreateTQRequest req) throws RequestException { 730 try { 731 Queue queue = new Queue(); 732 queue.init(proxyAgent.getId(), null); 733 AgentId qId = queue.getId(); 734 735 queue.deploy(); 736 737 proxyAgent.sendNot(qId, new SetRightRequest(null, null, 2)); 739 740 activeCtx.addTemporaryDestination(qId); 741 742 SessCreateTDReply reply = new SessCreateTDReply(req, qId.toString()); 743 proxyAgent.sendNot(proxyAgent.getId(), 744 new SyncReply(activeCtxId, reply)); 745 746 proxyAgent.sendNot(AdminTopic.getDefault(), 747 new RegisterTmpDestNot(qId, false, true)); 748 749 if (logger.isLoggable(BasicLevel.DEBUG)) 750 logger.log(BasicLevel.DEBUG, "Temporary queue " + qId + " created."); 751 } 752 catch (java.io.IOException iE) { 753 throw new RequestException("Could not create temporary queue: " + iE); 754 } 755 } 756 757 769 private void doReact(SessCreateTTRequest req) throws RequestException { 770 Topic topic = new Topic(); 771 topic.init(proxyAgent.getId(), null); 772 AgentId tId = topic.getId(); 773 774 try { 775 topic.deploy(); 776 777 proxyAgent.sendNot(tId, new SetRightRequest(null, null, 2)); 779 780 activeCtx.addTemporaryDestination(tId); 781 782 SessCreateTDReply reply = new SessCreateTDReply(req, tId.toString()); 783 proxyAgent.sendNot(proxyAgent.getId(), 784 new SyncReply(activeCtxId, reply)); 785 786 proxyAgent.sendNot(AdminTopic.getDefault(), 787 new RegisterTmpDestNot(tId, true, true)); 788 789 if (logger.isLoggable(BasicLevel.DEBUG)) 790 logger.log(BasicLevel.DEBUG, "Temporary topic" + tId + " created."); 791 } catch (java.io.IOException iE) { 792 topic = null; 793 throw new RequestException("Could not deploy temporary topic " 794 + tId + ": " + iE); 795 } 796 } 797 798 805 private void doReact(ConsumerSubRequest req) throws StateException { 806 AgentId topicId = AgentId.fromString(req.getTarget()); 807 String subName = req.getSubName(); 808 809 boolean newTopic = ! topicsTable.containsKey(topicId); 810 boolean newSub = ! subsTable.containsKey(subName); 811 812 TopicSubscription tSub; 813 ClientSubscription cSub; 814 815 boolean sent = false; 817 818 if (newTopic) { tSub = new TopicSubscription(); 820 topicsTable.put(topicId, tSub); 821 } else { tSub = (TopicSubscription) topicsTable.get(topicId); 823 } 824 825 if (newSub) { proxyAgent.setSave(); 828 cSub = new ClientSubscription(proxyAgent.getId(), 829 activeCtxId, 830 req.getRequestId(), 831 req.getDurable(), 832 topicId, 833 req.getSubName(), 834 req.getSelector(), 835 req.getNoLocal(), 836 dmqId, 837 threshold, 838 messagesTable); 839 cSub.setProxyAgent(proxyAgent); 840 841 if (logger.isLoggable(BasicLevel.DEBUG)) 842 logger.log(BasicLevel.DEBUG, "Subscription " + subName + " created."); 843 844 subsTable.put(subName, cSub); 845 tSub.putSubscription(subName, req.getSelector()); 846 sent = 847 updateSubscriptionToTopic(topicId, activeCtxId, req.getRequestId()); 848 } else { cSub = (ClientSubscription) subsTable.get(subName); 850 851 if (cSub.getActive()) 852 throw new StateException("The durable subscription " + subName + 853 " has already been activated."); 854 855 boolean updatedTopic = ! topicId.equals(cSub.getTopicId()); 857 if (updatedTopic) { 858 TopicSubscription oldTSub = 859 (TopicSubscription) topicsTable.get(cSub.getTopicId()); 860 oldTSub.removeSubscription(subName); 861 updateSubscriptionToTopic(cSub.getTopicId(), -1, -1); 862 } 863 864 boolean updatedSelector; 866 if (req.getSelector() == null && cSub.getSelector() != null) 867 updatedSelector = true; 868 else if (req.getSelector() != null && cSub.getSelector() == null) 869 updatedSelector = true; 870 else if (req.getSelector() == null && cSub.getSelector() == null) 871 updatedSelector = false; 872 else 873 updatedSelector = ! req.getSelector().equals(cSub.getSelector()); 874 875 cSub.reactivate(activeCtxId, 877 req.getRequestId(), 878 topicId, 879 req.getSelector(), 880 req.getNoLocal()); 881 882 if (logger.isLoggable(BasicLevel.DEBUG)) 883 logger.log(BasicLevel.DEBUG, 884 "Subscription " + subName + " reactivated."); 885 886 if (updatedTopic || updatedSelector) { 888 tSub.putSubscription(subName, req.getSelector()); 889 sent = updateSubscriptionToTopic(topicId, 890 activeCtxId, 891 req.getRequestId()); 892 } 893 } 894 activeCtx.addSubName(subName); 896 897 if (! sent) 899 proxyAgent.sendNot(proxyAgent.getId(), 900 new SyncReply(activeCtxId, new ServerReply(req))); 901 } 902 903 912 private void doReact(ConsumerSetListRequest req) throws DestinationException 913 { 914 String subName = req.getTarget(); 916 ClientSubscription sub = null; 917 if (subName != null) 918 sub = (ClientSubscription) subsTable.get(subName); 919 920 if (sub == null) 921 throw new DestinationException("Can't set a listener on the non existing subscription: " + subName); 922 923 sub.setListener(req.getRequestId()); 924 925 ConsumerMessages consM = sub.deliver(); 926 if (consM != null) { 927 if (activeCtx.getActivated()) 928 doReply(consM); 929 else 930 activeCtx.addPendingDelivery(consM); 931 } 932 } 933 934 941 private void doReact(ConsumerUnsetListRequest req) 942 throws DestinationException { 943 if (req.getQueueMode()) { 945 activeCtx.cancelReceive(req.getCancelledRequestId()); 946 AgentId to = AgentId.fromString(req.getTarget()); 947 proxyAgent.sendNot( 948 to, 949 new AbortReceiveRequest(activeCtx.getId(), 950 req.getRequestId(), 951 req.getCancelledRequestId())); 952 } 953 } 954 955 962 private void doReact(ConsumerCloseSubRequest req) throws DestinationException 963 { 964 String subName = req.getTarget(); 966 ClientSubscription sub = null; 967 if (subName != null) 968 sub = (ClientSubscription) subsTable.get(subName); 969 970 if (sub == null) 971 throw new DestinationException("Can't desactivate non existing subscription: " + subName); 972 973 activeCtx.removeSubName(subName); 975 sub.deactivate(); 976 977 doReply(new ServerReply(req)); 979 } 980 981 987 private void doReact(ConsumerUnsubRequest req) throws DestinationException { 988 proxyAgent.setSave(); 990 991 String subName = req.getTarget(); 993 ClientSubscription sub = null; 994 if (subName != null) 995 sub = (ClientSubscription) subsTable.get(subName); 996 if (sub == null) 997 throw new DestinationException("Can't unsubscribe non existing subscription: " + subName); 998 999 if (logger.isLoggable(BasicLevel.DEBUG)) 1000 logger.log(BasicLevel.DEBUG, "Deleting subscription " + subName); 1001 1002 AgentId topicId = sub.getTopicId(); 1004 TopicSubscription tSub = (TopicSubscription) topicsTable.get(topicId); 1005 tSub.removeSubscription(subName); 1006 updateSubscriptionToTopic(topicId, -1, -1); 1007 1008 sub.delete(); 1010 activeCtx.removeSubName(subName); 1011 subsTable.remove(subName); 1012 1013 proxyAgent.sendNot(proxyAgent.getId(), 1015 new SyncReply(activeCtxId, new ServerReply(req))); 1016 } 1017 1018 1027 private void doReact(ConsumerReceiveRequest req) 1028 throws DestinationException { 1029 if (logger.isLoggable(BasicLevel.DEBUG)) 1030 logger.log(BasicLevel.DEBUG, "ProxyImpl.doReact(" + req + ')'); 1031 1032 String subName = req.getTarget(); 1033 ClientSubscription sub = null; 1034 if (subName != null) 1035 sub = (ClientSubscription) subsTable.get(subName); 1036 1037 if (sub == null) 1038 throw new DestinationException("Can't request a message from the unknown subscription: " + subName); 1039 1040 sub.setReceiver(req.getRequestId(), req.getTimeToLive()); 1042 ConsumerMessages consM = sub.deliver(); 1043 1044 if (consM != null && req.getReceiveAck()) { 1045 Vector messageList = consM.getMessages(); 1046 for (int i = 0; i < messageList.size(); i++) { 1047 Message msg = (Message)messageList.elementAt(i); 1048 sub.acknowledge(msg.getIdentifier()); 1049 } 1050 } 1051 1052 if (consM == null && req.getTimeToLive() == -1) { 1055 if (logger.isLoggable(BasicLevel.DEBUG)) 1056 logger.log(BasicLevel.DEBUG, " -> immediate delivery"); 1057 sub.unsetReceiver(); 1058 consM = new ConsumerMessages(req.getRequestId(), subName, false); 1059 } 1060 1061 if (consM != null && activeCtx.getActivated()) { 1063 doReply(consM); 1064 } else if (consM != null) { 1065 activeCtx.addPendingDelivery(consM); 1066 } 1067 } 1068 1069 1074 private void doReact(SessAckRequest req) 1075 { 1076 if (req.getQueueMode()) { 1077 AgentId qId = AgentId.fromString(req.getTarget()); 1078 Vector ids = req.getIds(); 1079 1080 AcknowledgeRequest not = 1081 new AcknowledgeRequest(activeCtxId, 1082 req.getRequestId(), 1083 ids); 1084 if (qId.getTo() == proxyAgent.getId().getTo()) { 1085 if (logger.isLoggable(BasicLevel.DEBUG)) 1086 logger.log(BasicLevel.DEBUG, " -> local acking"); 1087 not.setPersistent(false); 1088 } 1089 1090 proxyAgent.sendNot(qId, not); 1091 } 1092 else { 1093 String subName = req.getTarget(); 1094 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1095 if (sub != null) 1096 sub.acknowledge(req.getIds().elements()); 1097 } 1098 } 1099 1100 1105 private void doReact(SessDenyRequest req) { 1106 if (req.getQueueMode()) { 1107 AgentId qId = AgentId.fromString(req.getTarget()); 1108 Vector ids = req.getIds(); 1109 proxyAgent.sendNot(qId, 1110 new DenyRequest(activeCtxId, req.getRequestId(), ids)); 1111 1112 if (! req.getDoNotAck()) 1114 proxyAgent.sendNot(proxyAgent.getId(), 1115 new SyncReply(activeCtxId, new ServerReply(req))); 1116 } 1117 else { 1118 String subName = req.getTarget(); 1119 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1120 1121 if (sub == null) 1122 return; 1123 1124 sub.deny(req.getIds().elements()); 1125 1126 ConsumerMessages consM = sub.deliver(); 1128 if (consM != null && activeCtx.getActivated()) 1130 doReply(consM); 1131 else if (consM != null) 1132 activeCtx.addPendingDelivery(consM); 1133 } 1134 } 1135 1136 1141 private void doReact(ConsumerAckRequest req) 1142 { 1143 if (req.getQueueMode()) { 1144 AgentId qId = AgentId.fromString(req.getTarget()); 1145 AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId, 1146 req.getRequestId(), 1147 req.getIds()); 1148 if (qId.getTo() == proxyAgent.getId().getTo()) { 1149 if (logger.isLoggable(BasicLevel.DEBUG)) 1150 logger.log(BasicLevel.DEBUG, " -> local acking"); 1151 not.setPersistent(false); 1152 proxyAgent.sendNot(qId, not); 1153 } else { 1154 proxyAgent.sendNot(qId, not); 1155 } 1156 } else { 1157 String subName = req.getTarget(); 1158 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1159 if (sub != null) { 1160 sub.acknowledge(req.getIds().elements()); 1161 } 1162 } 1163 } 1164 1165 1172 private void doReact(ConsumerDenyRequest req) { 1173 if (req.getQueueMode()) { 1174 AgentId qId = AgentId.fromString(req.getTarget()); 1175 String id = req.getId(); 1176 proxyAgent.sendNot(qId, 1177 new DenyRequest(activeCtxId, req.getRequestId(), id)); 1178 1179 if (! req.getDoNotAck()) 1181 proxyAgent.sendNot(proxyAgent.getId(), 1182 new SyncReply(activeCtxId, new ServerReply(req))); 1183 } else { 1184 String subName = req.getTarget(); 1185 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1186 1187 if (sub == null) 1188 return; 1189 1190 Vector ids = new Vector (); 1191 ids.add(req.getId()); 1192 sub.deny(ids.elements()); 1193 1194 ConsumerMessages consM = sub.deliver(); 1196 if (consM != null && activeCtx.getActivated()) 1198 doReply(consM); 1199 else if (consM != null) 1200 activeCtx.addPendingDelivery(consM); 1201 } 1202 } 1203 1204 1212 private void doReact(TempDestDeleteRequest req) { 1213 AgentId tempId = AgentId.fromString(req.getTarget()); 1215 activeCtx.removeTemporaryDestination(tempId); 1216 1217 deleteTemporaryDestination(tempId); 1219 1220 proxyAgent.sendNot(proxyAgent.getId(), 1222 new SyncReply(activeCtxId, new ServerReply(req))); 1223 } 1224 1225 private void deleteTemporaryDestination(AgentId destId) { 1226 proxyAgent.sendNot(destId, new DeleteNot()); 1227 proxyAgent.sendNot(AdminTopic.getDefault(), 1228 new RegisterTmpDestNot(destId, false, false)); 1229 } 1230 1231 1239 private void doReact(XACnxPrepare req) throws StateException { 1240 try { 1241 Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI()); 1242 activeCtx.registerTxPrepare(xid, req); 1243 doReply(new ServerReply(req)); 1244 } 1245 catch (Exception exc) { 1246 throw new StateException(exc.getMessage()); 1247 } 1248 } 1249 1250 1260 private void doReact(XACnxCommit req) throws StateException { 1261 Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI()); 1262 1263 XACnxPrepare prepare = activeCtx.getTxPrepare(xid); 1264 1265 if (prepare == null) 1266 throw new StateException("Unknown transaction identifier."); 1267 1268 Vector sendings = prepare.getSendings(); 1269 Vector acks = prepare.getAcks(); 1270 1271 ProducerMessages pM; 1272 ClientMessages not; 1273 while (! sendings.isEmpty()) { 1274 pM = (ProducerMessages) sendings.remove(0); 1275 not = new ClientMessages(activeCtxId, 1276 pM.getRequestId(), 1277 pM.getMessages()); 1278 proxyAgent.sendNot(AgentId.fromString(pM.getTarget()), not); 1279 } 1280 1281 while (! acks.isEmpty()) 1282 doReact((SessAckRequest) acks.remove(0)); 1283 1284 doReply(new ServerReply(req)); 1285 } 1286 1287 1292 private void doReact(XACnxRollback req) { 1293 Xid xid = new Xid(req.getBQ(), req.getFI(), req.getGTI()); 1294 1295 String queueName; 1296 AgentId qId; 1297 Vector ids; 1298 for (Enumeration queues = req.getQueues(); queues.hasMoreElements();) { 1299 queueName = (String ) queues.nextElement(); 1300 qId = AgentId.fromString(queueName); 1301 ids = req.getQueueIds(queueName); 1302 proxyAgent.sendNot(qId, 1303 new DenyRequest(activeCtxId, req.getRequestId(), ids)); 1304 } 1305 1306 String subName; 1307 ClientSubscription sub; 1308 ConsumerMessages consM; 1309 for (Enumeration subs = req.getSubs(); subs.hasMoreElements();) { 1310 subName = (String ) subs.nextElement(); 1311 sub = (ClientSubscription) subsTable.get(subName); 1312 if (sub != null) { 1313 sub.deny(req.getSubIds(subName).elements()); 1314 1315 consM = sub.deliver(); 1316 if (consM != null && activeCtx.getActivated()) 1317 doReply(consM); 1318 else if (consM != null) 1319 activeCtx.addPendingDelivery(consM); 1320 } 1321 } 1322 1323 XACnxPrepare prepare = activeCtx.getTxPrepare(xid); 1324 1325 if (prepare != null) { 1326 Vector acks = prepare.getAcks(); 1327 1328 SessAckRequest ack; 1329 while (! acks.isEmpty()) { 1330 ack = (SessAckRequest) acks.remove(0); 1331 doReact(new SessDenyRequest(ack.getTarget(), 1332 ack.getIds(), 1333 ack.getQueueMode(), 1334 true)); 1335 } 1336 } 1337 1338 proxyAgent.sendNot(proxyAgent.getId(), 1339 new SyncReply(activeCtxId, new ServerReply(req))); 1340 } 1341 1342 1352 private void doReact(XACnxRecoverRequest req) 1353 throws StateException { 1354 proxyAgent.setSave(); 1356 1357 Vector bqs = new Vector (); 1358 Vector fis = new Vector (); 1359 Vector gtis = new Vector (); 1360 if (recoveredTransactions != null) { 1361 Enumeration keys = recoveredTransactions.keys(); 1362 Xid xid; 1363 while (keys.hasMoreElements()) { 1364 xid = (Xid) keys.nextElement(); 1365 bqs.add(xid.bq); 1366 fis.add(new Integer (xid.fi)); 1367 gtis.add(xid.gti); 1368 try { 1369 activeCtx.registerTxPrepare(xid, 1370 (XACnxPrepare) recoveredTransactions.remove(xid)); 1371 } 1372 catch (Exception exc) { 1373 throw new StateException("Recovered transaction branch has already been prepared by the RM."); 1374 } 1375 } 1376 } 1377 recoveredTransactions = null; 1378 doReply(new XACnxRecoverReply(req, bqs, fis, gtis)); 1379 } 1380 1381 1386 private void doReact(AgentId from, SetDMQRequest not) { 1387 proxyAgent.setSave(); 1389 1390 dmqId = not.getDmqId(); 1391 1392 for (Enumeration keys = subsTable.keys(); keys.hasMoreElements();) 1393 ((ClientSubscription) subsTable.get(keys.nextElement())).setDMQId(dmqId); 1394 1395 proxyAgent.sendNot(from, new AdminReply(not, true, "DMQ set: " + dmqId)); 1396 } 1397 1398 1403 private void doReact(AgentId from, SetThreshRequest not) { 1404 proxyAgent.setSave(); 1406 1407 threshold = not.getThreshold(); 1408 1409 for (Enumeration keys = subsTable.keys(); keys.hasMoreElements();) 1410 ((ClientSubscription) 1411 subsTable.get(keys.nextElement())).setThreshold(not.getThreshold()); 1412 1413 proxyAgent.sendNot(from, 1414 new AdminReply(not, 1415 true, 1416 "Threshold set: " + threshold)); 1417 } 1418 1419 1423 protected void doReact(AgentId from, SetNbMaxMsgRequest not) { 1424 int nbMaxMsg = not.getNbMaxMsg(); 1425 String subName = not.getSubName(); 1426 1427 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1428 if (sub != null) { 1429 sub.setNbMaxMsg(nbMaxMsg); 1430 proxyAgent.sendNot(from, 1431 new AdminReply(not, 1432 true, 1433 "NbMaxMsg set: " + nbMaxMsg + " on " + subName)); 1434 } else { 1435 proxyAgent.sendNot(from, 1436 new AdminReply(not, 1437 false, 1438 "NbMaxMsg not set: " + nbMaxMsg + " on " + subName)); 1439 } 1440 } 1441 1442 1449 protected void doReact(AgentId from, Monit_GetNbMaxMsg not) { 1450 int nbMaxMsg = -1; 1451 String subName = not.getSubName(); 1452 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1453 if (sub != null) 1454 nbMaxMsg = sub.getNbMaxMsg(); 1455 1456 Channel.sendTo(from, new Monit_GetNbMaxMsgRep(not,nbMaxMsg)); 1457 } 1458 1459 1468 public int getNbMaxMsg(String subName) { 1469 int nbMaxMsg = -1; 1470 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1471 if (sub != null) 1472 nbMaxMsg = sub.getNbMaxMsg(); 1473 return nbMaxMsg; 1474 } 1475 1476 1484 public void setNbMaxMsg(String subName, int nbMaxMsg) { 1485 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1486 if (sub != null) 1487 sub.setNbMaxMsg(nbMaxMsg); 1488 } 1489 1490 1494 private void doReact(AgentId from, Monit_GetDMQSettings not) 1495 { 1496 String id = null; 1497 if (dmqId != null) 1498 id = dmqId.toString(); 1499 proxyAgent.sendNot(from, new Monit_GetDMQSettingsRep(not, id, threshold)); 1500 } 1501 1502 1507 private void doReact(SyncReply not) 1508 { 1509 doReply(not.key, not.reply); 1510 } 1511 1512 1517 private void doReact(int key, CnxCloseRequest req) { 1518 proxyAgent.setSave(); 1520 1521 1523 AgentId id; 1525 for (Enumeration ids = activeCtx.getDeliveringQueues(); ids 1526 .hasMoreElements();) { 1527 id = (AgentId) ids.nextElement(); 1528 proxyAgent.sendNot(id, new DenyRequest(key)); 1529 } 1530 1531 String subName = null; 1533 ClientSubscription sub; 1534 Vector topics = new Vector (); 1535 for (Enumeration subs = activeCtx.getActiveSubs(); subs.hasMoreElements();) { 1536 subName = (String ) subs.nextElement(); 1537 sub = (ClientSubscription) subsTable.get(subName); 1538 1539 if (logger.isLoggable(BasicLevel.DEBUG)) 1540 logger.log(BasicLevel.DEBUG, "Deactivate subscription " 1541 + subName + ", topic id = " + sub.getTopicId()); 1542 1543 if (sub.getDurable()) { 1544 sub.deactivate(); 1545 1546 if (logger.isLoggable(BasicLevel.DEBUG)) 1547 logger.log(BasicLevel.DEBUG, "Durable subscription" 1548 + subName + " de-activated."); 1549 } else { 1550 if (logger.isLoggable(BasicLevel.DEBUG)) 1551 logger.log(BasicLevel.DEBUG, " -> topicsTable = " 1552 + topicsTable); 1553 1554 sub.delete(); 1555 subsTable.remove(subName); 1556 TopicSubscription tSub = (TopicSubscription) topicsTable.get(sub 1557 .getTopicId()); 1558 tSub.removeSubscription(subName); 1559 1560 if (!topics.contains(sub.getTopicId())) 1561 topics.add(sub.getTopicId()); 1562 1563 if (logger.isLoggable(BasicLevel.DEBUG)) 1564 logger.log(BasicLevel.DEBUG, "Temporary subscription" 1565 + subName + " deleted."); 1566 } 1567 } 1568 for (Enumeration topicIds = topics.elements(); topicIds.hasMoreElements();) 1570 updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1); 1571 1572 AgentId destId; 1574 for (Enumeration dests = activeCtx.getTempDestinations(); dests 1575 .hasMoreElements();) { 1576 destId = (AgentId) dests.nextElement(); 1577 activeCtx.removeTemporaryDestination(destId); 1578 deleteTemporaryDestination(destId); 1579 1580 if (logger.isLoggable(BasicLevel.DEBUG)) 1581 logger.log(BasicLevel.DEBUG, "Deletes temporary" 1582 + " destination " + destId.toString()); 1583 } 1584 1585 Enumeration xids = activeCtx.getTxIds(); 1587 Xid xid; 1588 XACnxPrepare recoveredPrepare; 1589 XACnxPrepare prepare; 1590 while (xids.hasMoreElements()) { 1591 if (recoveredTransactions == null) 1592 recoveredTransactions = new Hashtable (); 1593 1594 xid = (Xid) xids.nextElement(); 1595 1596 recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid); 1597 prepare = activeCtx.getTxPrepare(xid); 1598 1599 if (recoveredPrepare == null) 1600 recoveredTransactions.put(xid, prepare); 1601 else { 1602 recoveredPrepare.getSendings().addAll(prepare.getSendings()); 1603 recoveredPrepare.getAcks().addAll(prepare.getAcks()); 1604 } 1605 } 1606 1607 contexts.remove(new Integer (key)); 1609 activeCtx = null; 1610 setActiveCtxId(-1); 1611 1612 CnxCloseReply reply = new CnxCloseReply(); 1613 reply.setCorrelationId(req.getRequestId()); 1614 proxyAgent.sendToClient(key, reply); 1615 } 1616 1617 private void doReact(int key, ActivateConsumerRequest req) { 1618 String subName = req.getTarget(); 1619 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1620 sub.setActive(req.getActivate()); 1621 } 1622 1623 private void doReact(int key, CommitRequest req) { 1624 int asyncReplyCount = 0; 1626 1627 Enumeration pms = req.getProducerMessages(); 1628 if (pms != null) { 1629 while (pms.hasMoreElements()) { 1630 ProducerMessages pm = (ProducerMessages) pms.nextElement(); 1631 AgentId destId = AgentId.fromString(pm.getTarget()); 1632 ClientMessages not = new ClientMessages(key, 1633 req.getRequestId(), pm.getMessages()); 1634 setDmq(not); 1635 if (destId.getTo() == proxyAgent.getId().getTo()) { 1636 not.setPersistent(false); 1638 if (req.getAsyncSend()) { 1639 not.setAsyncSend(true); 1640 } else { 1641 asyncReplyCount++; 1642 } 1643 } 1644 proxyAgent.sendNot(destId, not); 1645 } 1646 } 1647 1648 Enumeration acks = req.getAckRequests(); 1649 if (acks != null) { 1650 while (acks.hasMoreElements()) { 1651 SessAckRequest sar = (SessAckRequest) acks.nextElement(); 1652 if (sar.getQueueMode()) { 1653 AgentId qId = AgentId.fromString(sar.getTarget()); 1654 Vector ids = sar.getIds(); 1655 AcknowledgeRequest not = new AcknowledgeRequest(activeCtxId, req 1656 .getRequestId(), ids); 1657 if (qId.getTo() == proxyAgent.getId().getTo()) { 1658 not.setPersistent(false); 1660 } 1662 1663 proxyAgent.sendNot(qId, not); 1664 } else { 1665 String subName = sar.getTarget(); 1666 ClientSubscription sub = (ClientSubscription) subsTable.get(subName); 1667 if (sub != null) { 1668 sub.acknowledge(sar.getIds().elements()); 1669 proxyAgent.setSave(); 1670 } 1671 } 1672 } 1673 } 1674 1675 if (!req.getAsyncSend()) { 1676 if (asyncReplyCount == 0) { 1677 proxyAgent.sendNot(proxyAgent.getId(), new SendReplyNot(key, req 1678 .getRequestId())); 1679 } else { 1680 activeCtx.addMultiReplyContext(req.getRequestId(), asyncReplyCount); 1684 } 1685 } 1686 } 1688 1689 1701 private void doFwd(AgentId from, AbstractReply rep) { 1702 if (logger.isLoggable(BasicLevel.DEBUG)) 1703 logger.log(BasicLevel.DEBUG, 1704 "--- " + this + " got " + rep.getClass().getName() + 1705 " with id: " + rep.getCorrelationId() + " from: " + from); 1706 1707 if (rep instanceof QueueMsgReply) 1708 doFwd(from, (QueueMsgReply) rep); 1709 else if (rep instanceof BrowseReply) 1710 doFwd((BrowseReply) rep); 1711 else if (rep instanceof SubscribeReply) 1712 doFwd((SubscribeReply) rep); 1713 else if (rep instanceof TopicMsgsReply) 1714 doFwd(from, (TopicMsgsReply) rep); 1715 else if (rep instanceof ExceptionReply) 1716 doReact(from, (ExceptionReply) rep); 1717 else { 1718 if (logger.isLoggable(BasicLevel.ERROR)) 1719 logger.log(BasicLevel.ERROR, "Unexpected reply: " + rep); 1720 } 1721 } 1722 1723 1724 1731 private void doFwd(AgentId from, QueueMsgReply rep) { 1732 if (logger.isLoggable(BasicLevel.DEBUG)) 1733 logger.log(BasicLevel.DEBUG, 1734 "ProxyImpl.doFwd(" + from + ',' + rep + ')'); 1735 1736 try { 1737 setCtx(rep.getClientContext()); 1739 1740 if (rep.getCorrelationId() == activeCtx.getCancelledReceive()) { 1743 if (logger.isLoggable(BasicLevel.DEBUG)) 1744 logger.log(BasicLevel.DEBUG, 1745 " -> cancelled receive: id=" + activeCtx.getCancelledReceive()); 1746 1747 if (rep.getSize() > 0) { 1748 Vector msgList = rep.getMessages(); 1749 for (int i = 0; i < msgList.size(); i++) { 1750 Message msg = new Message((org.objectweb.joram.shared.messages.Message) msgList.elementAt(i)); 1751 String msgId = msg.getIdentifier(); 1752 1753 if (logger.isLoggable(BasicLevel.INFO)) 1754 logger.log(BasicLevel.INFO, " -> denying message: " + msgId); 1755 1756 proxyAgent.sendNot(from, 1757 new DenyRequest(0, rep.getCorrelationId(), msgId)); 1758 } 1759 } 1760 } else { 1761 if (logger.isLoggable(BasicLevel.DEBUG)) 1762 logger.log(BasicLevel.DEBUG, " -> reply"); 1763 1764 ConsumerMessages jRep; 1765 1766 if (rep.getSize() > 0) { 1769 jRep = new ConsumerMessages( 1770 rep.getCorrelationId(), 1771 rep.getMessages(), 1772 from.toString(), 1773 true); 1774 activeCtx.addDeliveringQueue(from); 1775 } else { 1776 jRep = new ConsumerMessages( 1777 rep.getCorrelationId(), 1778 (Vector )null, 1779 from.toString(), 1780 true); 1781 } 1782 1783 if (activeCtx.getActivated()) { 1785 doReply(jRep); 1786 } else { 1787 if (logger.isLoggable(BasicLevel.DEBUG)) 1788 logger.log(BasicLevel.DEBUG, " -> buffer the reply"); 1789 activeCtx.addPendingDelivery(jRep); 1790 } 1791 } 1792 } catch (StateException pE) { 1793 if (logger.isLoggable(BasicLevel.DEBUG)) 1795 logger.log(BasicLevel.DEBUG, "", pE); 1796 if (rep.getMessages().size() > 0) { 1797 Vector msgList = rep.getMessages(); 1798 for (int i = 0; i < msgList.size(); i++) { 1799 Message msg = new Message((org.objectweb.joram.shared.messages.Message) msgList.elementAt(i)); 1800 String msgId = msg.getIdentifier(); 1801 1802 if (logger.isLoggable(BasicLevel.INFO)) 1803 logger.log(BasicLevel.INFO, "Denying message: " + msgId); 1804 1805 proxyAgent.sendNot(from, 1806 new DenyRequest(0, rep.getCorrelationId(), msgId)); 1807 } 1808 } 1809 } 1810 } 1811 1812 1813 1818 private void doFwd(BrowseReply rep) { 1819 try { 1820 setCtx(rep.getClientContext()); 1822 doReply(new QBrowseReply(rep.getCorrelationId(), 1823 rep.getMessages())); 1824 } catch (StateException pE) { 1825 } 1827 } 1828 1829 1833 private void doFwd(SubscribeReply rep) { 1834 try { 1835 setCtx(rep.getClientContext()); 1836 doReply(new ServerReply(rep.getCorrelationId())); 1837 } catch (StateException pE) { 1838 } 1840 } 1841 1842 transient String msgTxname = null; 1843 1844 protected final String getMsgTxname() { 1845 if (msgTxname == null) 1846 msgTxname = 'M' + proxyAgent.getId().toString() + '_'; 1847 return msgTxname; 1848 } 1849 1850 protected final void setMsgTxName(Message msg) { 1851 if (msg.getTxName() == null) 1852 msg.setTxName(getMsgTxname() + msg.order); 1853 } 1854 1855 1859 private void doFwd(AgentId from, TopicMsgsReply rep) { 1860 TopicSubscription tSub = (TopicSubscription) topicsTable.get(from); 1862 if (tSub == null || tSub.isEmpty()) return; 1863 1864 String subName; 1865 ClientSubscription sub; 1866 1867 Vector messages = new Vector (); 1870 for (Enumeration msgs = rep.getMessages().elements(); 1871 msgs.hasMoreElements();) { 1872 Message message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement()); 1873 message.order = arrivalsCounter++; 1875 messages.add(message); 1876 } 1877 1878 for (Enumeration names = tSub.getNames(); names.hasMoreElements();) { 1879 subName = (String ) names.nextElement(); 1880 sub = (ClientSubscription) subsTable.get(subName); 1881 if (sub == null) continue; 1882 1883 sub.browseNewMessages(messages); 1885 } 1886 1887 for (Enumeration msgs = messages.elements(); msgs.hasMoreElements();) { 1889 Message message = (Message) msgs.nextElement(); 1890 1891 if (message.durableAcksCounter > 0) { 1892 if (logger.isLoggable(BasicLevel.DEBUG)) 1893 logger.log(BasicLevel.DEBUG, " -> save message " + message); 1894 proxyAgent.setSave(); 1895 setMsgTxName(message); 1897 message.save(); 1898 } 1899 } 1900 1901 for (Enumeration names = tSub.getNames(); names.hasMoreElements();) { 1902 subName = (String ) names.nextElement(); 1903 sub = (ClientSubscription) subsTable.get(subName); 1904 if (sub == null) continue; 1905 1906 if (sub.getActive()) { 1908 ConsumerMessages consM = sub.deliver(); 1909 1910 if (consM != null) { 1911 try { 1912 setCtx(sub.getContextId()); 1913 if (activeCtx.getActivated()) 1914 doReply(consM); 1915 else 1916 activeCtx.addPendingDelivery(consM); 1917 } catch (StateException pE) { 1918 } 1920 } 1921 } 1922 } 1923 } 1924 1925 1933 private void doReact(AgentId from, ExceptionReply rep) { 1934 if (logger.isLoggable(BasicLevel.DEBUG)) 1935 logger.log(BasicLevel.DEBUG, 1936 "ProxyImpl.doReact(" + from + ',' + rep + ')'); 1937 MomException exc = rep.getException(); 1938 1939 if (exc instanceof AccessException) { 1941 if (logger.isLoggable(BasicLevel.DEBUG)) 1942 logger.log(BasicLevel.DEBUG, " -> topicsTable.remove(" + from + ')'); 1943 TopicSubscription tSub = (TopicSubscription) topicsTable.remove(from); 1944 if (tSub != null) { 1945 String name; 1946 ClientSubscription sub; 1947 for (Enumeration e = tSub.getNames(); e.hasMoreElements();) { 1948 name = (String ) e.nextElement(); 1949 sub = (ClientSubscription) subsTable.remove(name); 1950 sub.delete(); 1951 1952 try { 1953 setCtx(sub.getContextId()); 1954 activeCtx.removeSubName(name); 1955 doReply(new MomExceptionReply(rep.getCorrelationId(), exc)); 1956 } catch (StateException pExc) {} 1957 } 1958 return; 1959 } 1960 } 1961 try { 1963 setCtx(rep.getClientContext()); 1964 doReply(new MomExceptionReply(rep.getCorrelationId(), exc)); 1965 } catch (StateException pExc) {} 1966 } 1967 1968 1972 private void doReact(AdminReply reply) 1973 {} 1974 1975 1989 private void doReact(UnknownAgent uA) { 1990 if (logger.isLoggable(BasicLevel.DEBUG)) 1991 logger.log(BasicLevel.DEBUG, "ProxyImpl.doReact(" + uA + ')'); 1992 Notification not = uA.not; 1993 AgentId agId = uA.agent; 1994 1995 if (logger.isLoggable(BasicLevel.INFO)) 1996 logger.log(BasicLevel.INFO, 1997 "--- " + this + " notified of invalid destination: " + agId.toString()); 1998 1999 if (logger.isLoggable(BasicLevel.DEBUG)) 2001 logger.log(BasicLevel.DEBUG, 2002 " -> topicsTable.remove(" + agId + ')'); 2003 TopicSubscription tSub = (TopicSubscription) topicsTable.remove(agId); 2004 if (tSub != null) { 2005 String name; 2006 ClientSubscription sub; 2007 DestinationException exc; 2008 exc = new DestinationException("Destination " + agId + 2009 " does not exist."); 2010 for (Enumeration e = tSub.getNames(); e.hasMoreElements();) { 2011 name = (String ) e.nextElement(); 2012 sub = (ClientSubscription) subsTable.remove(name); 2013 sub.delete(); 2014 2015 try { 2016 setCtx(sub.getContextId()); 2017 activeCtx.removeSubName(name); 2018 doReply(new MomExceptionReply(sub.getSubRequestId(), exc)); 2019 } catch (StateException pExc) {} 2020 } 2021 return; 2022 } 2023 2024 if (not instanceof AbstractRequest) { 2025 AbstractRequest req = (AbstractRequest) not; 2026 2027 if (req instanceof ClientMessages) { 2029 if (dmqId != null && agId.equals(dmqId)) { 2032 proxyAgent.setSave(); 2034 dmqId = null; 2035 for (Enumeration keys = subsTable.keys(); keys.hasMoreElements();) 2036 ((ClientSubscription) 2037 subsTable.get(keys.nextElement())).setDMQId(null); 2038 } 2039 if (DeadMQueueImpl.getId() != null 2041 && ! agId.equals(DeadMQueueImpl.getId())) { 2042 for (Enumeration msgs = ((ClientMessages) req).getMessages().elements(); 2044 msgs.hasMoreElements();) { 2045 org.objectweb.joram.shared.messages.Message msg = (org.objectweb.joram.shared.messages.Message) msgs.nextElement(); 2046 msg.deletedDest = true; 2047 } 2048 sendToDMQ((ClientMessages) req); 2049 } 2050 2051 DestinationException exc; 2052 exc = new DestinationException("Destination " + agId + 2053 " does not exist."); 2054 MomExceptionReply mer = new MomExceptionReply(req.getRequestId(), exc); 2055 try { 2056 setCtx(req.getClientContext()); 2057 doReply(mer); 2060 } catch (StateException se) { 2061 if (logger.isLoggable(BasicLevel.DEBUG)) 2062 logger.log(BasicLevel.DEBUG, "", se); 2063 } 2065 } else if (req instanceof ReceiveRequest) { 2066 DestinationException exc = new DestinationException( 2067 "Destination " + agId + " does not exist."); 2068 MomExceptionReply mer = new MomExceptionReply( 2069 req.getRequestId(), exc); 2070 try { 2071 setCtx(req.getClientContext()); 2072 if (activeCtx.getActivated()) { 2073 doReply(mer); 2074 } else { 2075 activeCtx.addPendingDelivery(mer); 2076 } 2077 } catch (StateException se) { 2078 if (logger.isLoggable(BasicLevel.DEBUG)) 2079 logger.log(BasicLevel.DEBUG, "", se); 2080 } 2082 } 2083 if (logger.isLoggable(BasicLevel.INFO)) 2084 logger.log(BasicLevel.INFO, 2085 "Connection " + req.getClientContext() + 2086 " notified of the deletion of destination " + agId); 2087 } 2088 } 2089 2090 private void doReact(UserAdminRequestNot not) { 2091 org.objectweb.joram.shared.admin.AdminRequest adminRequest = 2092 not.getRequest(); 2093 if (adminRequest instanceof GetSubscriptions) { 2094 doReact((GetSubscriptions)adminRequest, 2095 not.getReplyTo(), 2096 not.getRequestMsgId(), 2097 not.getReplyMsgId()); 2098 } else if (adminRequest instanceof GetSubscriptionMessageIds) { 2099 doReact((GetSubscriptionMessageIds)adminRequest, 2100 not.getReplyTo(), 2101 not.getRequestMsgId(), 2102 not.getReplyMsgId()); 2103 } else if (adminRequest instanceof GetSubscriptionMessage) { 2104 doReact((GetSubscriptionMessage)adminRequest, 2105 not.getReplyTo(), 2106 not.getRequestMsgId(), 2107 not.getReplyMsgId()); 2108 } else if (adminRequest instanceof DeleteSubscriptionMessage) { 2109 doReact((DeleteSubscriptionMessage)adminRequest, 2110 not.getReplyTo(), 2111 not.getRequestMsgId(), 2112 not.getReplyMsgId()); 2113 } else if (adminRequest instanceof GetSubscription) { 2114 doReact((GetSubscription)adminRequest, 2115 not.getReplyTo(), 2116 not.getRequestMsgId(), 2117 not.getReplyMsgId()); 2118 } else if (adminRequest instanceof ClearSubscription) { 2119 doReact((ClearSubscription)adminRequest, 2120 not.getReplyTo(), 2121 not.getRequestMsgId(), 2122 not.getReplyMsgId()); 2123 } 2124 } 2125 2126 private void doReact(GetSubscriptions request, 2127 AgentId replyTo, 2128 String requestMsgId, 2129 String replyMsgId) { 2130 Enumeration keys = subsTable.keys(); 2131 Enumeration values = subsTable.elements(); 2132 String [] subNames = new String [subsTable.size()]; 2133 String [] topicIds = new String [subsTable.size()]; 2134 int[] messageCounts = new int[subsTable.size()]; 2135 boolean[] durable = new boolean[subsTable.size()]; 2136 int i = 0; 2137 while (keys.hasMoreElements()) { 2138 subNames[i] = (String )keys.nextElement(); 2139 ClientSubscription cs = 2140 (ClientSubscription)values.nextElement(); 2141 topicIds[i] = cs.getTopicId().toString(); 2142 messageCounts[i] = cs.getMessageCount(); 2143 durable[i] = cs.getDurable(); 2144 i++; 2145 } 2146 GetSubscriptionsRep reply = new GetSubscriptionsRep( 2147 subNames, topicIds, messageCounts, durable); 2148 replyToTopic(reply, replyTo, requestMsgId, replyMsgId); 2149 } 2150 2151 2157 public String [] getSubscriptionNames() { 2158 Enumeration keys = subsTable.keys(); 2159 String [] res = new String [subsTable.size()]; 2160 int i = 0; 2161 while (keys.hasMoreElements()) { 2162 res[i] = (String )keys.nextElement(); 2163 i++; 2164 } 2165 return res; 2166 } 2167 2168 2174 public int getSubscriptionMessageCount(String subName) { 2175 ClientSubscription cs = 2176 (ClientSubscription)subsTable.get(subName); 2177 return cs.getMessageCount(); 2178 } 2179 2180 2186 public String getSubscriptionTopicId(String subName) { 2187 ClientSubscription cs = 2188 (ClientSubscription)subsTable.get(subName); 2189 return cs.getTopicId().toString(); 2190 } 2191 2192 private void doReact(GetSubscriptionMessageIds request, 2193 AgentId replyTo, 2194 String requestMsgId, 2195 String replyMsgId) { 2196 String subName = request.getSubscriptionName(); 2197 ClientSubscription cs = null; 2198 if (subName != null) { 2199 cs = (ClientSubscription)subsTable.get(subName); 2200 } 2201 if (cs != null) { 2202 GetSubscriptionMessageIdsRep reply = 2203 new GetSubscriptionMessageIdsRep( 2204 cs.getMessageIds()); 2205 replyToTopic(reply, replyTo, requestMsgId, replyMsgId); 2206 } else { 2207 replyToTopic( 2208 new org.objectweb.joram.shared.admin.AdminReply( 2209 false, "Subscription not found: " + 2210 request.getSubscriptionName()), 2211 replyTo, requestMsgId, replyMsgId); 2212 } 2213 } 2214 2215 2222 public String [] getSubscriptionMessageIds(String subName) { 2223 ClientSubscription cs = 2224 (ClientSubscription)subsTable.get(subName); 2225 if (cs != null) { 2226 return cs.getMessageIds(); 2227 } else return null; 2228 } 2229 2230 private void doReact(GetSubscription request, 2231 AgentId replyTo, 2232 String requestMsgId, 2233 String replyMsgId) { 2234 String subName = request.getSubscriptionName(); 2235 ClientSubscription cs = null; 2236 if (subName != null) { 2237 cs = (ClientSubscription)subsTable.get(subName); 2238 } 2239 if (cs != null) { 2240 GetSubscriptionRep reply = 2241 new GetSubscriptionRep( 2242 cs.getTopicId().toString(), 2243 cs.getMessageCount(), 2244 cs.getDurable()); 2245 replyToTopic(reply, replyTo, requestMsgId, replyMsgId); 2246 } else { 2247 replyToTopic( 2248 new org.objectweb.joram.shared.admin.AdminReply( 2249 false, "Subscription not found: " + 2250 request.getSubscriptionName()), 2251 replyTo, requestMsgId, replyMsgId); 2252 } 2253 } 2254 2255 private void doReact(GetSubscriptionMessage request, 2256 AgentId replyTo, 2257 String requestMsgId, 2258 String replyMsgId) { 2259 ClientSubscription cs = null; 2260 String subName = request.getSubscriptionName(); 2261 if (subName != null) { 2262 cs = (ClientSubscription)subsTable.get(subName); 2263 } 2264 if (cs != null) { 2265 String msgId = request.getMessageId(); 2266 Message message = null; 2267 if (msgId != null) { 2268 message = cs.getMessage(msgId); 2269 } 2270 if (message != null) { 2271 GetSubscriptionMessageRep reply = 2272 new GetSubscriptionMessageRep(message.msg); 2273 replyToTopic(reply, replyTo, requestMsgId, replyMsgId); 2274 } else { 2275 replyToTopic( 2276 new org.objectweb.joram.shared.admin.AdminReply( 2277 false, "Message not found: " + 2278 request.getMessageId()), 2279 replyTo, requestMsgId, replyMsgId); 2280 } 2281 } else { 2282 replyToTopic( 2283 new org.objectweb.joram.shared.admin.AdminReply( 2284 false, "Subscription not found: " + 2285 subName), 2286 replyTo, requestMsgId, replyMsgId); 2287 } 2288 } 2289 2290 2300 public CompositeDataSupport getSubscriptionMessage( 2301 String subName, 2302 String msgId) throws Exception { 2303 if (logger.isLoggable(BasicLevel.DEBUG)) 2304 logger.log(BasicLevel.DEBUG, 2305 "ProxyImpl.getSubscriptionMessage(" + 2306 subName + ',' + msgId + ')'); 2307 ClientSubscription cs = 2308 (ClientSubscription) subsTable.get(subName); 2309 if (cs != null) { 2310 Message msg = cs.getMessage(msgId); 2311 if (msg != null) { 2312 return MessageJMXWrapper.createCompositeDataSupport(msg); 2313 } else { 2314 throw new Exception ("Message not found"); 2315 } 2316 } else { 2317 throw new Exception ("Subscription not found"); 2318 } 2319 } 2320 2321 private void doReact(DeleteSubscriptionMessage request, 2322 AgentId replyTo, 2323 String requestMsgId, 2324 String replyMsgId) { 2325 String subName = request.getSubscriptionName(); 2326 ClientSubscription cs = null; 2327 if (subName != null) { 2328 cs = (ClientSubscription)subsTable.get(subName); 2329 } 2330 if (cs != null) { 2331 cs.deleteMessage(request.getMessageId()); 2332 replyToTopic( 2333 new org.objectweb.joram.shared.admin.AdminReply( 2334 true, null), 2335 replyTo, requestMsgId, replyMsgId); 2336 } else { 2337 replyToTopic( 2338 new org.objectweb.joram.shared.admin.AdminReply( 2339 false, "Subscription not found: " + 2340 request.getSubscriptionName()), 2341 replyTo, requestMsgId, replyMsgId); 2342 } 2343 } 2344 2345 2353 public void deleteSubscriptionMessage(String subName, 2354 String msgId) { 2355 ClientSubscription cs = (ClientSubscription)subsTable.get(subName); 2356 if (cs != null) { 2357 cs.deleteMessage(msgId); 2358 } 2359 } 2360 2361 private void doReact(ClearSubscription request, 2362 AgentId replyTo, 2363 String requestMsgId, 2364 String replyMsgId) { 2365 String subName = request.getSubscriptionName(); 2366 ClientSubscription cs = null; 2367 if (subName != null) { 2368 cs = (ClientSubscription) subsTable.get(subName); 2369 } 2370 if (cs != null) { 2371 cs.clear(); 2372 replyToTopic( 2373 new org.objectweb.joram.shared.admin.AdminReply( 2374 true, null), 2375 replyTo, requestMsgId, replyMsgId); 2376 } else { 2377 replyToTopic( 2378 new org.objectweb.joram.shared.admin.AdminReply( 2379 false, "Subscription not found: " + 2380 request.getSubscriptionName()), 2381 replyTo, requestMsgId, replyMsgId); 2382 } 2383 } 2384 2385 private void replyToTopic( 2386 org.objectweb.joram.shared.admin.AdminReply reply, 2387 AgentId replyTo, 2388 String requestMsgId, 2389 String replyMsgId) { 2390 org.objectweb.joram.shared.messages.Message message = new org.objectweb.joram.shared.messages.Message(); 2391 message.correlationId = requestMsgId; 2392 message.timestamp = System.currentTimeMillis(); 2393 message.setDestination(replyTo.toString(), Topic.TOPIC_TYPE); 2394 message.id = replyMsgId;; 2395 try { 2396 message.setObject(reply); 2397 ClientMessages clientMessages = new ClientMessages(-1, -1, message); 2398 Channel.sendTo(replyTo, clientMessages); 2399 } catch (Exception exc) { 2400 if (logger.isLoggable(BasicLevel.ERROR)) 2401 logger.log(BasicLevel.ERROR, "", exc); 2402 throw new Error (exc.getMessage()); 2403 } 2404 } 2405 2406 2414 private void setCtx(int key) throws StateException { 2415 if (logger.isLoggable(BasicLevel.DEBUG)) 2416 logger.log(BasicLevel.DEBUG, "ProxyImpl.setCtx(" + key + ')'); 2417 2418 if (key < 0) throw new StateException("Invalid context: " + key); 2419 2420 if (key == activeCtxId) return; 2423 2424 setActiveCtxId(key); 2426 activeCtx = (ClientContext) contexts.get(new Integer (key)); 2427 2428 if (activeCtx == null) { 2430 setActiveCtxId(-1); 2431 activeCtx = null; 2432 throw new StateException("Context " + key + " is closed or broken."); 2433 } 2434 } 2435 2436 2442 private void doReply(AbstractJmsReply reply) { 2443 doReply(activeCtxId, reply); 2444 } 2445 2446 ClientContext getClientContext(int ctxId) { 2447 return (ClientContext)contexts.get( 2448 new Integer (ctxId)); 2449 } 2450 2451 2458 private void doReply(int key, AbstractJmsReply reply) { 2459 if (logger.isLoggable(BasicLevel.DEBUG)) 2460 logger.log(BasicLevel.DEBUG, 2461 "ProxyImpl.doReply(" + key + ',' + reply + ')'); 2462 proxyAgent.sendToClient(key, reply); 2463 } 2464 2465 2468 private void sendToDMQ(ClientMessages messages) 2469 { 2470 if (dmqId != null) 2471 proxyAgent.sendNot(dmqId, messages); 2472 else if (DeadMQueueImpl.getId() != null) 2473 proxyAgent.sendNot(DeadMQueueImpl.getId(), messages); 2474 } 2475 2476 void cleanPendingMessages(long currentTime) { 2477 if (logger.isLoggable(BasicLevel.DEBUG)) 2478 logger.log(BasicLevel.DEBUG, 2479 "ProxyImpl.cleanPendingMessages(" + messagesTable.size() + ')'); 2480 2481 2482 String id = null; 2483 Message message = null; 2484 ClientMessages deadMessages = null; 2485 2486 for (Enumeration ids = messagesTable.keys(); ids.hasMoreElements(); ) { 2487 id = (String ) ids.nextElement(); 2488 message = (Message) messagesTable.get(id); 2489 if ((message == null) || message.isValid(currentTime)) continue; 2490 2491 messagesTable.remove(id); 2492 message.delete(); 2493 message.msg.expired = true; 2494 2495 if (deadMessages == null) 2496 deadMessages = new ClientMessages(); 2497 deadMessages.addMessage(message.msg); 2498 2499 if (logger.isLoggable(BasicLevel.DEBUG)) 2500 logger.log(BasicLevel.DEBUG, 2501 "ProxyImpl expired message " + message.getIdentifier()); 2502 } 2503 if (deadMessages != null) sendToDMQ(deadMessages); 2505 2506 if (logger.isLoggable(BasicLevel.DEBUG)) 2507 logger.log(BasicLevel.DEBUG, 2508 "ProxyImpl.cleanPendingMessages -> " + messagesTable.size()); 2509 } 2510 2511 2518 public void deleteProxy(AgentId from) throws Exception 2519 { 2520 if (logger.isLoggable(BasicLevel.DEBUG)) 2521 logger.log(BasicLevel.DEBUG, 2522 "--- " + this + " notified to be deleted."); 2523 2524 if (! from.equals(AdminTopicImpl.getReference().getId())) 2525 throw new Exception (); 2526 2527 Enumeration keys = contexts.keys(); 2529 int key; 2530 while (keys.hasMoreElements()) { 2531 key = ((Integer ) keys.nextElement()).intValue(); 2532 try { 2533 setCtx(key); 2534 2535 doReply(new MomExceptionReply(new StateException("Client proxy is deleted."))); 2536 2537 AgentId id; 2539 for (Enumeration ids = activeCtx.getDeliveringQueues(); 2540 ids.hasMoreElements();) { 2541 id = (AgentId) ids.nextElement(); 2542 proxyAgent.sendNot(id, new DenyRequest(activeCtxId)); 2543 } 2544 2545 AgentId destId; 2547 for (Enumeration dests = activeCtx.getTempDestinations(); 2548 dests.hasMoreElements();) { 2549 destId = (AgentId) dests.nextElement(); 2550 deleteTemporaryDestination(destId); 2551 2552 if (logger.isLoggable(BasicLevel.DEBUG)) 2553 logger.log(BasicLevel.DEBUG, 2554 "Sending DeleteNot to temporary destination " + 2555 destId.toString()); 2556 } 2557 } catch (StateException pE) {} 2558 } 2559 2560 AgentId destId; 2562 for (Enumeration topics = topicsTable.keys(); topics.hasMoreElements();) { 2563 destId = (AgentId) topics.nextElement(); 2564 if (logger.isLoggable(BasicLevel.DEBUG)) 2565 logger.log(BasicLevel.DEBUG, " -> topicsTable.remove(" + destId + ')'); 2566 topicsTable.remove(destId); 2567 updateSubscriptionToTopic(destId, -1, -1); 2568 } 2569 } 2570 2571 2572 2582 private boolean updateSubscriptionToTopic(AgentId topicId, 2583 int contextId, 2584 int requestId) 2585 { 2586 if (logger.isLoggable(BasicLevel.DEBUG)) 2587 logger.log(BasicLevel.DEBUG, 2588 "ProxyImpl.updateSubscriptionToTopic(" + 2589 topicId + ',' + contextId + ',' + requestId + ')'); 2590 TopicSubscription tSub = (TopicSubscription) topicsTable.get(topicId); 2591 2592 if (tSub == null || tSub.isEmpty()) { 2594 if (logger.isLoggable(BasicLevel.DEBUG)) 2595 logger.log(BasicLevel.DEBUG, 2596 " -> topicsTable.remove(" + topicId + ')'); 2597 topicsTable.remove(topicId); 2598 proxyAgent.sendNot(topicId, 2599 new UnsubscribeRequest(contextId, requestId)); 2600 return false; 2601 } 2602 2603 String builtSelector = tSub.buildSelector(); 2605 if (tSub.getLastSelector() != null 2606 && builtSelector.equals(tSub.getLastSelector())) 2607 return false; 2608 2609 tSub.setLastSelector(builtSelector); 2610 proxyAgent.sendNot(topicId, new SubscribeRequest(contextId, 2611 requestId, 2612 builtSelector)); 2613 2614 return true; 2615 } 2616 2617 public AgentId getId() { 2618 return proxyAgent.getId(); 2619 } 2620 2621 public String getStringId() { 2622 return proxyAgent.getId().toString(); 2623 } 2624 2625 public void readBag(ObjectInputStream in) 2626 throws IOException , ClassNotFoundException { 2627 if (logger.isLoggable(BasicLevel.DEBUG)) 2628 logger.log(BasicLevel.DEBUG, 2629 "ProxyImpl[" + proxyAgent.getId() + "].readbag()"); 2630 2631 activeCtxId = in.readInt(); 2632 2645 2646 int size = in.readInt(); 2647 Object obj=null; 2648 for(int j=0;j<size;j++){ 2649 obj=in.readObject(); 2650 ClientContext cc = (ClientContext) contexts.get((Integer )obj); 2651 cc.setProxyAgent(proxyAgent); 2652 cc.readBag(in); 2653 } 2654 size = in.readInt(); 2655 for(int j=0;j<size;j++){ 2656 obj=in.readObject(); 2657 ClientSubscription cs = (ClientSubscription)subsTable.get((String )obj); 2658 cs.setProxyAgent(proxyAgent); 2659 cs.readBag(in); 2660 } 2661 2662 2663 activeCtx = (ClientContext)contexts.get( 2664 new Integer (activeCtxId)); 2665 2666 Vector messages = (Vector )in.readObject(); 2667 2668 if (logger.isLoggable(BasicLevel.DEBUG)) 2669 logger.log(BasicLevel.DEBUG, " -> messages = " + messages); 2670 2671 topicsTable = new Hashtable (); 2672 messagesTable = new Hashtable (); 2673 2674 Vector topics = new Vector (); 2675 TopicSubscription tSub; 2676 2677 2678 for (Enumeration subNames = subsTable.keys(); 2679 subNames.hasMoreElements();) { 2680 String subName = (String ) subNames.nextElement(); 2681 2682 if (logger.isLoggable(BasicLevel.DEBUG)) 2683 logger.log(BasicLevel.DEBUG, " -> subName = " + subName); 2684 2685 ClientSubscription cSub = (ClientSubscription) subsTable.get(subName); 2686 AgentId destId = cSub.getTopicId(); 2687 if (! topics.contains(destId)) 2688 topics.add(destId); 2689 cSub.reinitialize(getStringId(), 2690 messagesTable, 2691 messages, 2692 false); 2693 2694 if (logger.isLoggable(BasicLevel.DEBUG)) 2695 logger.log(BasicLevel.DEBUG, " -> destId = " + destId + ')'); 2696 2697 tSub = (TopicSubscription) topicsTable.get(destId); 2698 if (tSub == null) { 2699 tSub = new TopicSubscription(); 2700 topicsTable.put(destId, tSub); 2701 } 2702 tSub.putSubscription(subName, cSub.getSelector()); 2703 } 2704 if (logger.isLoggable(BasicLevel.DEBUG)) 2705 logger.log(BasicLevel.DEBUG, " -> topicsTable = " + topicsTable); 2706 2707 } 2713 2714 public void writeBag(ObjectOutputStream out) 2715 throws IOException { 2716 if (logger.isLoggable(BasicLevel.DEBUG)) 2717 logger.log(BasicLevel.DEBUG, 2718 "ProxyImpl[" + proxyAgent.getId() + "].writeBag()"); 2719 2720 out.writeInt(activeCtxId); 2721 2722 2730 2731 out.writeInt(contexts.size()); 2733 Enumeration elements = contexts.keys(); 2734 Object obj=null; 2735 while (elements.hasMoreElements()) { 2736 obj=elements.nextElement(); 2737 out.writeObject(obj); 2738 ((ClientContext)contexts.get((Integer )obj)).writeBag(out); 2739 } 2740 out.writeInt(subsTable.size()); 2742 elements = subsTable.keys(); 2743 while (elements.hasMoreElements()) { 2744 obj=elements.nextElement(); 2745 out.writeObject(obj); 2746 ((ClientSubscription)subsTable.get((String )obj)).writeBag(out); 2747 } 2748 2749 2750 Vector messages = new Vector (); 2751 elements = messagesTable.elements(); 2752 while (elements.hasMoreElements()) { 2753 messages.addElement(elements.nextElement()); 2754 } 2755 2756 if (logger.isLoggable(BasicLevel.DEBUG)) 2757 logger.log(BasicLevel.DEBUG, " -> messages = " + messages + ')'); 2758 2759 out.writeObject(messages); 2760 2761 } 2762} 2763 2764 2768class Xid implements java.io.Serializable { 2769 byte[] bq; 2770 int fi; 2771 byte[] gti; 2772 2773 2774 Xid(byte[] bq, int fi, byte[] gti) 2775 { 2776 this.bq = bq; 2777 this.fi = fi; 2778 this.gti = gti; 2779 } 2780 2781 public boolean equals(Object o) 2782 { 2783 if (! (o instanceof Xid)) 2784 return false; 2785 2786 Xid other = (Xid) o; 2787 2788 return java.util.Arrays.equals(bq, other.bq) 2789 && fi == other.fi 2790 && java.util.Arrays.equals(gti, other.gti); 2791 } 2792 2793 public int hashCode() 2794 { 2795 return (new String (bq) + "-" + new String (gti)).hashCode(); 2796 } 2797} 2798 | Popular Tags |