| 1 26 package org.objectweb.joram.mom.proxies; 27 28 import java.io.IOException ; 29 import java.io.ObjectInputStream ; 30 import java.io.ObjectOutputStream ; 31 32 import java.util.Enumeration ; 33 import java.util.Hashtable ; 34 import java.util.Vector ; 35 36 import fr.dyade.aaa.agent.AgentId; 37 import fr.dyade.aaa.agent.DeleteNot; 38 import fr.dyade.aaa.agent.Notification; 39 import fr.dyade.aaa.agent.UnknownAgent; 40 import fr.dyade.aaa.agent.UnknownNotificationException; 41 import fr.dyade.aaa.agent.Channel; 42 43 import org.objectweb.joram.mom.dest.*; 44 import org.objectweb.joram.mom.notifications.*; 45 import org.objectweb.joram.mom.messages.Message; 46 47 import org.objectweb.joram.shared.admin.DeleteUser; 48 import org.objectweb.joram.shared.admin.UpdateUser; 49 50 import org.objectweb.joram.shared.admin.GetSubscriptions; 51 import org.objectweb.joram.shared.admin.GetSubscriptionsRep; 52 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIds; 53 import org.objectweb.joram.shared.admin.GetSubscriptionMessageIdsRep; 54 import org.objectweb.joram.shared.admin.GetSubscriptionMessage; 55 import org.objectweb.joram.shared.admin.GetSubscriptionMessageRep; 56 import org.objectweb.joram.shared.admin.DeleteSubscriptionMessage; 57 import org.objectweb.joram.shared.admin.GetSubscription; 58 import org.objectweb.joram.shared.admin.GetSubscriptionRep; 59 import org.objectweb.joram.shared.admin.ClearSubscription; 60 61 import org.objectweb.joram.shared.client.*; 62 import org.objectweb.joram.shared.excepts.*; 63 64 import javax.management.openmbean.CompositeDataSupport ; 65 66 import fr.dyade.aaa.util.Debug; 67 import org.objectweb.util.monolog.api.BasicLevel; 68 import org.objectweb.util.monolog.api.Logger; 69 70 75 public class ProxyImpl implements java.io.Serializable , ProxyImplMBean { 76 public static Logger logger = Debug.getLogger(ProxyImpl.class.getName()); 77 78 79 protected long period = 60000L; 80 81 86 public long getPeriod() { 87 return period; 88 } 89 90 96 public void setPeriod(long period) { 97 if ((this.period == -1L) && (period != -1L)) { 98 Channel.sendTo(proxyAgent.getId(), new WakeUpNot()); 100 } 101 this.period = period; 102 } 103 104 108 private AgentId dmqId = null; 109 113 private Integer threshold = null; 114 115 121 private Hashtable contexts; 122 128 private Hashtable subsTable; 129 135 private Hashtable recoveredTransactions; 136 137 138 private long arrivalsCounter = 0; 139 140 141 private ProxyAgentItf proxyAgent; 142 148 private transient Hashtable topicsTable; 149 155 private transient Hashtable messagesTable; 156 157 162 private transient int activeCtxId; 163 164 private transient ClientContext activeCtx; 165 166 169 public ProxyImpl(ProxyAgentItf proxyAgent) { 170 contexts = new Hashtable (); 171 subsTable = new Hashtable (); 172 this.proxyAgent = proxyAgent; 173 if (logger.isLoggable(BasicLevel.DEBUG)) 174 logger.log(BasicLevel.DEBUG, this + ": created."); 175 } 176 177 180 public String toString() { 181 if (proxyAgent == null) 182 return "ProxyImpl:"; 183 else 184 return "ProxyImpl:" + proxyAgent.getId(); 185 } 186 187 188 194 public void initialize(boolean firstTime) throws Exception { 195 if (logger.isLoggable(BasicLevel.DEBUG)) 196 logger.log(BasicLevel.DEBUG, "--- " + this + " (re)initializing..."); 197 198 topicsTable = new Hashtable (); 199 messagesTable = new Hashtable (); 200 201 setActiveCtxId(-1); 202 203 205 ClientContext activeCtx; 207 AgentId destId; 208 for (Enumeration ctxIds = contexts.keys(); ctxIds.hasMoreElements();) { 209 activeCtx = (ClientContext) contexts.remove(ctxIds.nextElement()); 210 211 for (Enumeration queueIds = activeCtx.getDeliveringQueues(); 213 queueIds.hasMoreElements();) { 214 destId = (AgentId) queueIds.nextElement(); 215 proxyAgent.sendNot(destId, new DenyRequest(activeCtx.getId())); 216 217 if (logger.isLoggable(BasicLevel.DEBUG)) 218 logger.log(BasicLevel.DEBUG, 219 "Denies messages on queue " + destId.toString()); 220 } 221 222 Enumeration xids = activeCtx.getTxIds(); 224 Xid xid; 225 XACnxPrepare recoveredPrepare; 226 XACnxPrepare prepare; 227 while (xids.hasMoreElements()) { 228 if (recoveredTransactions == null) 229 recoveredTransactions = new Hashtable (); 230 231 xid = (Xid) xids.nextElement(); 232 233 recoveredPrepare = (XACnxPrepare) recoveredTransactions.get(xid); 234 prepare = activeCtx.getTxPrepare(xid); 235 236 if (recoveredPrepare == null) 237 recoveredTransactions.put(xid, prepare); 238 else { 239 recoveredPrepare.getSendings().addAll(prepare.getSendings()); 240 recoveredPrepare.getAcks().addAll(prepare.getAcks()); 241 } 242 } 243 244 for (Enumeration tempDests = activeCtx.getTempDestinations(); 246 tempDests.hasMoreElements();) { 247 destId = (AgentId) tempDests.nextElement(); 248 deleteTemporaryDestination(destId); 249 250 if (logger.isLoggable(BasicLevel.DEBUG)) 251 logger.log(BasicLevel.DEBUG, 252 "Deletes temporary destination " + destId.toString()); 253 } 254 } 255 256 Vector messages = Message.loadAll(getMsgTxname()); 258 259 if (subsTable.isEmpty()) { 260 Message.deleteAll(getMsgTxname()); 263 } 264 265 String subName; 267 ClientSubscription cSub; 268 Vector topics = new Vector (); 269 TopicSubscription tSub; 270 for (Enumeration subNames = subsTable.keys(); 271 subNames.hasMoreElements();) { 272 subName = (String ) subNames.nextElement(); 273 cSub = (ClientSubscription) subsTable.get(subName); 274 destId = cSub.getTopicId(); 275 if (! topics.contains(destId)) 276 topics.add(destId); 277 if (! cSub.getDurable()) 279 subsTable.remove(subName); 280 else { 282 cSub.setProxyAgent(proxyAgent); 283 cSub.reinitialize(getStringId(), 284 messagesTable, 285 messages, 286 true); 287 tSub = (TopicSubscription) topicsTable.get(destId); 288 if (tSub == null) { 289 tSub = new TopicSubscription(); 290 topicsTable.put(destId, tSub); 291 } 292 tSub.putSubscription(subName, cSub.getSelector()); 293 } 294 } 295 for (Enumeration topicIds = topics.elements(); 297 topicIds.hasMoreElements();) 298 updateSubscriptionToTopic((AgentId) topicIds.nextElement(), -1, -1); 299 } 300 301 private void setActiveCtxId(int activeCtxId) { 302 if (logger.isLoggable(BasicLevel.DEBUG)) 303 logger.log(BasicLevel.DEBUG, 304 "ProxyImpl.setActiveCtxId(" + activeCtxId + ')'); 305 this.activeCtxId = activeCtxId; 306 } 307 308 317 public void reactToClientRequest(int key, AbstractJmsRequest request) 318 { 319 try { 320 if (logger.isLoggable(BasicLevel.DEBUG)) 321 logger.log(BasicLevel.DEBUG, 322 "--- " + this + " got " + request.getClass().getName() + 323 " with id: " + request.getRequestId() + 324 " through activeCtx: " + key); 325 326 if (request instanceof ProducerMessages) 327 reactToClientRequest(key, (ProducerMessages) request); 328 else if (request instanceof ConsumerReceiveRequest) 329 reactToClientRequest(key, (ConsumerReceiveRequest) request); 330 else if (request instanceof ConsumerSetListRequest) 331 reactToClientRequest(key, (ConsumerSetListRequest) request); 332 else if (request instanceof QBrowseRequest) 333 reactToClientRequest(key, (QBrowseRequest) request); 334 else if (request instanceof JmsRequestGroup) 335 reactToClientRequest(key, (JmsRequestGroup) request); 336 else { 337 doReact(key, request); 338 } 339 } 340 catch (IllegalArgumentException iE) { 343 DestinationException dE = 344 new DestinationException("Proxy could not forward the request to" 345 + " incorrectly identified destination: " 346 + iE); 347 348 doReply(key, new MomExceptionReply(request.getRequestId(), dE)); 349 } 350 } 351 352 358 private void reactToClientRequest(int key, ProducerMessages req) { 359 if (logger.isLoggable(BasicLevel.DEBUG)) 360 logger.log(BasicLevel.DEBUG, 361 "ProxyImpl.reactToClientRequest(" + key + ',' + req + ')'); 362 363 AgentId destId = AgentId.fromString(req.getTarget()); 364 ClientMessages not = new ClientMessages( 365 key, 366 req.getRequestId(), 367 req.getMessages()); 368 369 setDmq(not); 370 371 372 if (destId.getTo() == proxyAgent.getId().getTo()) { 373 if (logger.isLoggable(BasicLevel.DEBUG)) 374 logger.log(BasicLevel.DEBUG, " -> local sending"); 375 not.setPersistent(false); 376 if (req.getAsyncSend()) { 377 not.setAsyncSend(true); 378 } 379 } else { 380 if (logger.isLoggable(BasicLevel.DEBUG)) 381 logger.log(BasicLevel.DEBUG, " -> remote sending"); 382 if (!req.getAsyncSend()) { 383 proxyAgent.sendNot(proxyAgent.getId(), 384 new SendReplyNot(key, req.getRequestId())); 385 } 386 } 387 388 proxyAgent.sendNot(destId, not); 389 } 390 391 private void setDmq(ClientMessages not) { 392 if (dmqId != null) { 394 not.setDMQId(dmqId); 395 } else { 396 not.setDMQId(DeadMQueueImpl.getId()); 397 } 398 } 399 400 405 private void reactToClientRequest(int key, ConsumerReceiveRequest req) 406 { 407 if (req.getQueueMode()) { 408 ReceiveRequest not = new ReceiveRequest( 409 key, 410 req.getRequestId(), 411 req.getSelector(), 412 req.getTimeToLive(), 413 req.getReceiveAck(), 414 null, 415 1); 416 AgentId to = AgentId.fromString(req.getTarget()); 417 if (to.getTo() == proxyAgent.getId().getTo()) { 418 if (logger.isLoggable(BasicLevel.DEBUG)) 419 logger.log(BasicLevel.DEBUG, " -> local receiving"); 420 not.setPersistent(false); 421 proxyAgent.sendNot(to, not); 422 } else { 423 proxyAgent.sendNot(to, not); 424 } 425 } else { 426 doReact(key, req); 427 } 428 } 429 430 435 private void reactToClientRequest(int key, ConsumerSetListRequest req) { 436 if (logger.isLoggable(BasicLevel.DEBUG)) 437 logger.log(BasicLevel.DEBUG, 438 "ProxyImp.reactToClientRequest(" + key + ',' + req + ')'); 439 if (req.getQueueMode()) { 440 ReceiveRequest not = new ReceiveRequest(key, 441 req.getRequestId(), 442 req.getSelector(), 443 0, 444 false, 445 req.getMessageIdsToAck(), 446 req.getMessageCount()); 447 AgentId to = AgentId.fromString(req.getTarget()); 448 if (to.getTo() == proxyAgent.getId().getTo()) { 449 if (logger.isLoggable(BasicLevel.DEBUG)) 450 logger.log(BasicLevel.DEBUG, " -> local sending"); 451 not.setPersistent(false); 452 proxyAgent.sendNot(to, not); 453 } else { 454 proxyAgent.sendNot(to, not); 455 } 456 } 457 else { 458 doReact(key, req); 459 } 460 } 461 462 466 private void reactToClientRequest(int key, QBrowseRequest req) 467 { 468 proxyAgent.sendNot(AgentId.fromString(req.getTarget()), 469 new BrowseRequest(key, 470 req.getRequestId(), 471 req.getSelector())); 472 } 473 474 private void reactToClientRequest(int key, JmsRequestGroup request) { 475 AbstractJmsRequest[] requests = request.getRequests(); 476 RequestBuffer rm = new RequestBuffer(proxyAgent); 477 for (int i = 0; i < requests.length; i++) { 478 if (requests[i] instanceof ProducerMessages) { 479 ProducerMessages pm =(ProducerMessages) requests[i]; 480 rm.put(key, pm); 481 } else { 482 reactToClientRequest(key, requests[i]); 483 } 484 } 485 486 rm.flush(); 487 } 488 489 508 public void react(AgentId from, Notification not) 509 throws UnknownNotificationException 510 { 511 if (not instanceof SetDMQRequest) 513 doReact(from, (SetDMQRequest) not); 514 else if (not instanceof SetThreshRequest) 515 doReact(from, (SetThreshRequest) not); 516 else if (not instanceof SetNbMaxMsgRequest) 517 doReact(from, (SetNbMaxMsgRequest) not); 518 else if (not instanceof Monit_GetNbMaxMsg) 519 doReact(from, (Monit_GetNbMaxMsg) not); 520 else if (not instanceof Monit_GetDMQSettings) 521 doReact(from, (Monit_GetDMQSettings) not); 522 else if (not instanceof SyncReply) 524 doReact((SyncReply) not); 525 else if (not instanceof AbstractReply) 527 doFwd(from, (AbstractReply) not); 528 else if (not instanceof AdminReply) 529 doReact((AdminReply) not); 530 else if (not instanceof UnknownAgent) 532 doReact((UnknownAgent) not); 533 else if (not instanceof UserAdminRequestNot) 534 doReact((UserAdminRequestNot) not); 535 else 536 throw new UnknownNotificationException("Unexpected notification: " 537 + not.getClass().getName()); 538 } 539 540 541 572 private void doReact(int key, AbstractJmsRequest request) 573 { 574 try { 575 if (! (request instanceof CnxConnectRequest)) 578 setCtx(key); 579 580 if (request instanceof GetAdminTopicRequest) 581 doReact(key, (GetAdminTopicRequest) request); 582 else if (request instanceof CnxConnectRequest) 583 doReact(key, (CnxConnectRequest) request); 584 else if (request instanceof CnxStartRequest) 585 doReact((CnxStartRequest) request); 586 else if (request instanceof CnxStopRequest) 587 doReact((CnxStopRequest) request); 588 else if (request instanceof SessCreateTQRequest) 589 doReact((SessCreateTQRequest) request); 590 else if (request instanceof SessCreateTTRequest) 591 doReact((SessCreateTTRequest) request); 592 else if (request instanceof ConsumerSubRequest) 593 doReact((ConsumerSubRequest) request); 594 else if (request instanceof ConsumerUnsubRequest) 595 doReact((ConsumerUnsubRequest) request); 596 else if (request instanceof ConsumerCloseSubRequest) 597 doReact((ConsumerCloseSubRequest) request); 598 else if (request instanceof ConsumerSetListRequest) 599 doReact((ConsumerSetListRequest) request); 600 else if (request instanceof ConsumerUnsetListRequest) 601 doReact((ConsumerUnsetListRequest) request); 602 else if (request instanceof ConsumerReceiveRequest) 603 doReact((ConsumerReceiveRequest) request); 604 else if (request instanceof ConsumerAckRequest) 605 doReact((ConsumerAckRequest) request); 606 else if (request instanceof ConsumerDenyRequest) 607 doReact((ConsumerDenyRequest) request); 608 else if (request instanceof SessAckRequest) 609 doReact((SessAckRequest) request); 610 else if (request instanceof SessDenyRequest) 611 doReact((SessDenyRequest) request); 612 else if (request instanceof TempDestDeleteRequest) 613 doReact((TempDestDeleteRequest) request); 614 else if (request instanceof XACnxPrepare) 615 doReact((XACnxPrepare) request); 616 else if (request instanceof XACnxCommit) 617 doReact((XACnxCommit) request); 618 else if (request instanceof XACnxRollback) 619 doReact((XACnxRollback) request); 620 else if (request instanceof XACnxRecoverRequest) 621 doReact((XACnxRecoverRequest) request); 622 else if (request instanceof CnxCloseRequest) 623 doReact(key, (CnxCloseRequest) request); 624 else if (request instanceof ActivateConsumerRequest) 625 doReact(key, (ActivateConsumerRequest) request); 626 else if (request instanceof CommitRequest) 627 doReact(key, (CommitRequest)request); 628 } 629 catch (MomException mE) { 630 if (logger.isLoggable(BasicLevel.ERROR)) 631 logger.log(BasicLevel.ERROR, mE); 632 633 doReply(new MomExceptionReply(request.getRequestId(), mE)); 635 } 636 } 637 638 647 private void doReact(int key, GetAdminTopicRequest req) 648 throws AccessException 649 { 650 653 doReply( 654 key, 655 new GetAdminTopicReply( 656 req, 657 AdminTopicImpl.getReference().getId().toString())); 658 } 659 660 671 private void doReact(int key, CnxConnectRequest req) 672 throws DestinationException { 673 proxyAgent.setSave(); 675 676 setActiveCtxId(key); 677 activeCtx = new ClientContext(proxyAgent.getId(), key); 678 activeCtx.setProxyAgent(proxyAgent); 679 contexts.put(new Integer (key), activeCtx); 680 681 if (logger.isLoggable(BasicLevel.DEBUG)) 682 logger.log(BasicLevel.DEBUG, "Connection " + key + " opened."); 683 684 doReply(new CnxConnectReply(req, key, proxyAgent.getId().toString())); 685 } 686 687 694 private void doReact(CnxStartRequest req) { 695 activeCtx.setActivated(true); 696 697 for (Enumeration deliveries = activeCtx.getPendingDeliveries(); 699 deliveries.hasMoreElements();) 700 doReply((AbstractJmsReply) deliveries.nextElement()); 701 702 activeCtx.clearPendingDeliveries(); 704 } 705 706 712 private void doReact(CnxStopRequest req) { 713 activeCtx.setActivated(false); 714 doReply(new ServerReply(req)); 715 } 716 717 729 private void doReact(SessCreateTQRequest req) throws RequestException { 730 try { 731 Queue queue = new Queue(); 732 queue.init(proxyAgent.getId(), null); 733 AgentId qId = queue.getId(); 734 735 queue.deploy(); 736 737 proxyAgent.sendNot(qId, new SetRightRequest(null, null, 2)); 739 740 activeCtx.addTemporaryDestination(qId); 741 742 SessCreateTDReply reply = new SessCreateTDReply(req, qId.toString()); 743 proxyAgent.sendNot(proxyAgent.getId(), 744 new SyncReply(activeCtxId, reply)); 745 746 proxyAgent.sendNot(AdminTopic.getDefault(), 747 new RegisterTmpDestNot(qId, false, true)); 748 749 if (logger.isLoggable(BasicLevel.DEBUG)) 750 logger.log(BasicLevel.DEBUG, "Temporary queue " + qId + " created."); 751 } 752 catch (java.io.IOException iE) { 753 throw new RequestException("Could not create temporary queue: " + iE); 754 } 755 } 756 757 769 private void doReact(SessCreateTTRequest req) throws RequestException { 770 Topic topic = new Topic(); 771 topic.init(proxyAgent.getId(), null); 772 AgentId tId = topic.getId(); 773 774 try { 775 topic.deploy(); 776 777 proxyAgent.sendNot(tId, new SetRightRequest(null, null, 2)); 779 780 activeCtx.addTemporaryDestination(tId); 781 782 SessCreateTDReply reply = new SessCreateTDReply(req, tId.toString()); 783 proxyAgent.sendNot(proxyAgent.getId(), 784 new SyncReply(activeCtxId, reply)); 785 786 proxyAgent.sendNot(AdminTopic.getDefault(), 787 new RegisterTmpDestNot(tId, true, true)); 788 789 if (logger.isLoggable(BasicLevel.DEBUG)) 790 logger.log(BasicLevel.DEBUG, "Temporary topic" + tId + " created."); 791 } catch (java.io.IOException iE) { 792 topic = null; 793 throw new RequestException("Could not deploy temporary topic " 794 + tId + ": " + iE); 795 } 796 } 797 798 805 private void doReact(ConsumerSubRequest req) throws StateException { 806 AgentId topicId = AgentId.fromString(req.getTarget()); 807 String subName = req.getSubName(); 808 809 boolean newTopic = ! topicsTable.containsKey(topicId); 810 boolean newSub = ! subsTable.containsKey(subName); 811 812 TopicSubscription tSub; 813 ClientSubscription cSub; 814 815 boolean sent = false; 817 818 if (newTopic) { tSub = new TopicSubscription(); 820 topicsTable.put(topicId, tSub); 821 } else { tSub = (TopicSubscription) topicsTable.get(topicId); 823 } 824 825 if (newSub) { proxyAgent.setSave(); 828 cSub = new ClientSubscription(proxyAgent.getId(), 829 activeCtxId, 830 req.getRequestId(), 831 req.getDurable(), 832 topicId, 833 req.getSubName(), 834 req.getSelector(), 835 req.getNoLocal(), 836 dmqId, 837 threshold, 838 messagesTable); 839 cSub.setProxyAgent(proxyAgent); 840 841 if (logger.isLoggable(BasicLevel.DEBUG)) 842 logger.log(BasicLevel.DEBUG, "Subscription " + subName + " created."); 843 844 subsTable.put(subName, cSub); 845 tSub.putSubscription(subName, req.getSelector()); 846 sent = 847 updateSubscriptionToTopic(topicId, activeCtxId, req.getRequestId()); 848 } else { cSub = (ClientSubscription) subsTable.get(subName); 850 851 if (cSub.getActive()) 852 throw new StateException("The durable subscription " + subName + 853 " has already been activated."); 854 855 boolean updatedTopic = ! topicId.equals(cSub.getTopicId()); 857 if (updatedTopic) { 858 TopicSubscription oldTSub = 859 (TopicSubscription) topicsTable.get(cSub.getTopicId()); 860 oldTSub.removeSubscription(subName); 861 updateSubscriptionToTopic(cSub.getTopicId(), -1, -1); 862 } 863 864 boolean updatedSelector; 866 if (req.getSelector() == null && cSub.getSelector() != null) 867 updatedSelector = true; 868 else if (req.getSelector() != null && cSub.getSelector() == null) 869 updatedSelector = true; 870 else if (req.getSelector() == null && cSub.getSelector() == null) 871 updatedSelector = false; 872 else 873 updatedSelector = ! req.getSelector().equals(cSub.getSelector()); 874 875 cSub.reactivate(activeCtxId, 877 req.getRequestId(), 878 topicId, 879 req.getSelector(), 880 req.getNoLocal()); 881 882 if (logger.isLoggable(BasicLevel.DEBUG)) 883 logger.log(BasicLevel.DEBUG, 884 "Subscription " + subName + " reactivated."); 885 886 if (updatedTopic || updatedSelector) { 888 tSub.putSubscription(subName, req.getSelector()); 889 sent = updateSubscriptionToTopic(topicId, 890 activeCtxId, 891 req.getRequestId()); 892 } 893 } 894 activeCtx.addSubName(subName); 896 897 if (! sent) 899 proxyAgent.sendNot(proxyAgent.getId(), 900 new SyncReply(activeCtxId, new ServerReply(req))); 901 } 902 903 912 private void doReact(ConsumerSetListRequest req) throws DestinationException 913 { 914 String subName = req.getTarget(); 916 ClientSubscription sub = null; 917 if (subName != null) 918 sub = (ClientSubscription) subsTable.get(subName); 919 920 if (sub == null) 921 throw new DestinationException("Can't set a listener on the non existing subscription: " + subName); 922 |