1 package com.ubermq.jms.server.proc; 2 3 import java.io.*; 4 import java.lang.reflect.*; 5 import java.util.*; 6 7 import com.ubermq.jms.client.impl.*; 8 import com.ubermq.jms.common.datagram.*; 9 import com.ubermq.jms.common.datagram.control.*; 10 import com.ubermq.jms.common.overflow.*; 11 import com.ubermq.jms.common.routing.*; 12 import com.ubermq.jms.common.routing.impl.*; 13 import com.ubermq.jms.server.*; 14 import com.ubermq.jms.server.admin.*; 15 import com.ubermq.jms.server.journal.*; 16 import com.ubermq.jms.server.journal.impl.*; 17 import com.ubermq.kernel.*; 18 19 24 public final class DatagramProc 25 implements IMessageProcessor, MessageServerAdmin, IRouterStatistics 26 { 27 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(DatagramProc.class); 28 29 private static final long INITIAL_TIMEOUT = Long.valueOf(Configurator.getProperty(ServerConfig.DGP_INITIAL_TIMEOUT, "50")).longValue(); 31 private static final long MAXIMUM_TIMEOUT = Long.valueOf(Configurator.getProperty(ServerConfig.DGP_MAXIMUM_TIMEOUT, "5000")).longValue(); 32 private static final int TIMEOUT_FACTOR = Integer.valueOf(Configurator.getProperty(ServerConfig.DGP_BACKOFF_MULTIPLIER, "2")).intValue(); 33 34 private static final boolean shouldSendAck = Boolean.valueOf(Configurator.getProperty(ServerConfig.DGP_SHOULD_SEND_ACKS, "true")).booleanValue(); 36 37 private static final String OVERFLOW_HANDLER_CLASS = Configurator.getProperty(ServerConfig.DGP_OVERFLOW_HANDLER, ""); 39 private static final String OVERFLOW_HANDLER_INIT = Configurator.getProperty(ServerConfig.DGP_OVERFLOW_HANDLER_INIT, ""); 40 41 private static final String QUEUE_NAME_PREFIX = "$queue."; 43 44 private ISettingsRepository journal; 46 private DatagramFactoryHolder factories; 47 private Map durableSubscribers; 48 49 private IConfigurableRouter router, ackRouter; 52 private IRouterStatistics routerStats; 53 54 private transient volatile int nMessagesIn, nMessagesOut, nMessagesDropped; 56 private transient volatile int accumLatency; 57 private transient long startTime; 58 59 private IOverflowHandler overflow; 61 62 72 public DatagramProc(ISettingsRepository journal, 73 DatagramFactoryHolder factories) 74 throws java.io.IOException 75 { 76 this(journal, factories, null); 77 } 78 79 91 public DatagramProc(ISettingsRepository journal, 92 DatagramFactoryHolder factories, 93 IOverflowHandler handler) 94 throws java.io.IOException 95 { 96 this.router = new Router(); 97 this.ackRouter = new Router(); 98 this.routerStats = this; 99 this.journal = journal; 100 this.factories = factories; 101 this.durableSubscribers = new HashMap(); 102 restoreDurableSubscribers(); 103 104 startTime = System.currentTimeMillis(); 106 107 try 109 { 110 if (OVERFLOW_HANDLER_CLASS.length() > 0) 111 { 112 Class overflowClass = Class.forName(OVERFLOW_HANDLER_CLASS); 113 Constructor c = overflowClass.getConstructor(new Class [] {String .class}); 114 overflow = (IOverflowHandler)c.newInstance(new Object [] {OVERFLOW_HANDLER_INIT}); 115 } 116 else 117 { 118 throw new InstantiationException (); 119 } 120 } 121 catch (Exception e) 122 { 123 125 overflow = new TTLOverflowHandler(INITIAL_TIMEOUT, 132 TIMEOUT_FACTOR, 133 MAXIMUM_TIMEOUT, 134 true); 135 } 136 } 137 138 142 public void setServerName(String name) 143 { 144 router.setNodeLabel(name); 145 } 146 147 public void accept(IConnectionInfo ci) 148 { 149 router.addKnownNode(new ConnectionDestNode(ci)); 150 } 151 152 public void remove(IConnectionInfo ci) 153 { 154 ConnectionDestNode toRemove = new ConnectionDestNode(ci); 155 156 router.removeRoutesTo(toRemove); 157 router.removeKnownNode(toRemove); 158 } 159 160 public void process(IConnectionInfo conn, IDatagram d) 161 { 162 rules(conn, d); 165 } 166 167 private void rules(IConnectionInfo conn, IDatagram d) 168 { 169 if (d instanceof IAckDatagram) 173 { 174 ack(conn, (IAckDatagram)d); 175 } 176 if (d instanceof IControlDatagram) 177 { 178 control(conn, (IControlDatagram)d); 179 } 180 if (d instanceof IMessageDatagram) 181 { 182 msg(conn, (IMessageDatagram)d); 183 } 184 } 185 186 191 private void ack(IConnectionInfo conn, IAckDatagram ad) 192 { 193 log.debug("ack! " + ad.toString()); 194 195 Iterator iter = ackRouter.getRoutes(new StaticSourceDescriptor(conn.getId())).iterator(); 197 while(iter.hasNext()) 198 { 199 IDatagramEndpoint e = (IDatagramEndpoint)iter.next(); 200 e.deliver(ad); 201 } 202 } 203 204 private void control(IConnectionInfo conn, IControlDatagram cd) 205 { 206 log.debug("control! " + cd.toString()); 207 208 boolean ack = false; 209 try 210 { 211 ICommandSubGram command = cd.getCommandSubGram(); 214 215 if (command instanceof IStartDatagram) 216 { 217 router.addKnownNode(new ConnectionDestNode(conn)); 218 } 219 else if (command instanceof IStopDatagram) 220 { 221 router.removeKnownNode(new ConnectionDestNode(conn)); 222 } 223 else if (command instanceof ISubscribeDatagram) 224 { 225 ISubscribeDatagram sd = ((ISubscribeDatagram)command); 226 227 String selector = sd.getSelector(); 228 229 try 230 { 231 SourceSpec ss = new RegexpSourceSpec(RegexpHelper.xlat(sd.getTopicSpecification())); 233 234 if (selector != null && selector.length() > 0) 237 ss = new SelectorSourceSpec(ss, new SimpleSelector(selector)); 238 239 router.addRoute(ss, 241 new ConnectionDestNode(conn)); 242 } 243 catch (javax.jms.InvalidSelectorException e) 244 { 245 assert false : "Client allowed an invalid selector"; 249 } 250 } 251 else if (command instanceof IUnsubscribeDatagram) 252 { 253 IUnsubscribeDatagram sd = ((IUnsubscribeDatagram)command); 254 255 router.remove(new RegexpSourceSpec(RegexpHelper.xlat(sd.getTopicSpecification())), 256 new ConnectionDestNode(conn)); 257 } 258 else if (command instanceof IQueueStartDatagram) 259 { 260 IQueueStartDatagram iqs = ((IQueueStartDatagram)command); 261 setupDurableProxy(conn, 262 new RoundRobinArbiter(), 263 QUEUE_NAME_PREFIX + iqs.getQueueName(), 264 iqs.getQueueName(), 265 new LocalQueue(iqs.getQueueName()).getInternalTopicName()); 266 } 267 else if (command instanceof IQueueStopDatagram) 268 { 269 IQueueStopDatagram iqs = ((IQueueStopDatagram)command); 270 271 DurableSubscriptionProxy proxy = getDurableSubscriber(QUEUE_NAME_PREFIX + iqs.getQueueName()); 272 if (proxy != null) 273 { 274 unregisterDurableSubscriber(proxy); 276 277 proxy.disconnect(new ConnectionDestNode(conn)); 279 } 280 } 281 else if (command instanceof IQueueDeleteDatagram) 282 { 283 unsubscribeDurableSubscriber(QUEUE_NAME_PREFIX + ((IQueueDeleteDatagram)command).getQueueName()); 284 } 285 else if (command instanceof IDurableSubscribeDatagram) 286 { 287 IDurableSubscribeDatagram sd = ((IDurableSubscribeDatagram)command); 288 289 setupDurableProxy(conn, 290 new FailoverArbiter(), 291 sd.getSubscriptionName(), 292 sd.getSubscriptionName(), 293 sd.getTopicSpecification()); 294 } 295 else if (command instanceof IDurableUnsubscribeDatagram) 296 { 297 IDurableUnsubscribeDatagram sd = ((IDurableUnsubscribeDatagram)command); 298 299 unsubscribeDurableSubscriber(sd.getSubscriptionName()); 300 } 301 else if (command instanceof IDurableRecoverDatagram) 302 { 303 IDurableRecoverDatagram sd = ((IDurableRecoverDatagram)command); 304 305 DurableSubscriptionProxy proxy = getDurableSubscriber(sd.getSubscriptionName()); 306 if (proxy != null) 307 { 308 proxy.recover(); 309 } 310 } 311 else if (command instanceof IDurableGoingAwayDatagram) 312 { 313 IDurableGoingAwayDatagram sd = ((IDurableGoingAwayDatagram)command); 314 315 DurableSubscriptionProxy proxy = getDurableSubscriber(sd.getSubscriptionName()); 316 if (proxy != null) 317 { 318 unregisterDurableSubscriber(proxy); 320 321 proxy.disconnect(new ConnectionDestNode(conn)); 323 } 324 } 325 326 ack = true; 327 } 328 catch(Exception x) 329 { 330 log.error("Failed sending acknowledgement", x); 331 ack = false; 332 } 333 finally 334 { 335 log.debug("Sending acknowledgement " + ack + " to " + conn + " for RPC"); 336 337 sendAcknowledgement(conn, ack); 340 } 341 342 if (log.isDebugEnabled()) 344 log.debug(router.toString()); 345 } 346 347 private void msg(IConnectionInfo conn, IMessageDatagram md) 348 { 349 routerStats.messageIn(); 350 long time = System.currentTimeMillis(); 351 352 Iterator i = router.getRoutes(new MessageDatagramSourceDescriptor(md)).iterator(); 355 356 boolean failed = false; 357 if (!i.hasNext()) routerStats.messageDropped(); 358 while(i.hasNext()) 359 { 360 DatagramSink cdn = (DatagramSink)i.next(); 362 363 try 364 { 365 cdn.output(md, overflow); 367 } 368 catch(IOException x) 369 { 370 if (cdn instanceof ConnectionDestNode) 372 remove(((ConnectionDestNode)cdn).getConnection()); 373 } 374 catch(Exception other) 375 { 376 failed = true; 380 } 381 382 routerStats.messageOut(); 383 } 384 385 accumLatency += System.currentTimeMillis() - time; 387 388 if (shouldSendAck) 393 { 394 try 395 { 396 IAckDatagram ad = null; 397 if (!failed) 398 { 399 ad = factories.ackFactory().ack(md.getMessageId()); 400 } 401 else 402 { 403 ad = factories.ackFactory().nack(md.getMessageId()); 404 } 405 406 log.debug("sending " + conn + " message ID " + md.getMessageId() + " ack=" + !ad.isNegativeAck()); 408 conn.output(ad, overflow); 409 } 410 catch (IOException e) 411 { 412 log.debug("could not send message acknowledgement", e); 413 414 remove(conn); 416 } 417 } 418 } 419 420 private void sendAcknowledgement(IConnectionInfo conn, boolean ack) 421 { 422 try 423 { 424 conn.output(ack ? factories.ackFactory().ack() : factories.ackFactory().nack(), overflow); 425 } 426 catch (IOException e) 427 { 428 log.debug("could not send RPC ack due to I/O problem", e); 429 430 remove(conn); 433 } 434 } 435 436 438 private DurableSubscriptionProxy setupDurableProxy(IConnectionInfo conn, 439 DurableConnectionArbiter arbiter, 440 String durableName, 441 String displayName, 442 String subscription) 443 { 444 try 445 { 446 DurableSubscriptionProxy proxy = 449 createDurableSubscriber(arbiter, 450 durableName, 451 displayName, 452 subscription); 453 454 if (conn != null) 456 { 457 proxy.connect(new ConnectionDestNode(conn)); 458 459 registerDurableSubscriber(proxy, conn); 461 } 462 return proxy; 463 } 464 catch (java.io.IOException e) 465 { 466 log.fatal(e.getMessage()); 470 return null; 471 } 472 } 473 474 private static final String DURABLE_SUB_JOURNAL_PROP = "DurableSubscribers"; 475 private void addDurableSubscriber(DurableSubscriptionProxy p) 476 { 477 durableSubscribers.put(p.getName(), p); 479 480 List l = (List)journal.get(DURABLE_SUB_JOURNAL_PROP); 482 if (l == null) 483 { 484 l = new ArrayList(); 485 } 486 487 l.add(p); 488 journal.put(DURABLE_SUB_JOURNAL_PROP, l); 489 } 490 491 private void removeDurableSubscriber(DurableSubscriptionProxy p) 492 { 493 durableSubscribers.remove(p.getName()); 495 496 List l = (List)journal.get(DURABLE_SUB_JOURNAL_PROP); 498 if (l != null) 499 { 500 l.remove(p); 501 journal.put(DURABLE_SUB_JOURNAL_PROP, l); 502 } 503 504 router.removeRoutesTo(p); 506 router.removeKnownNode(p); 507 } 508 509 512 private void restoreDurableSubscribers() 513 { 514 List l = (List)journal.get(DURABLE_SUB_JOURNAL_PROP); 515 if (l != null) 516 { 517 Iterator iter = l.iterator(); 518 while (iter.hasNext()) 519 { 520 DurableSubscriptionProxy p = (DurableSubscriptionProxy)iter.next(); 521 durableSubscribers.put(p.getName(), p); 522 restoreDurableSubscriber(p); 523 } 524 } 525 } 526 527 530 private void restoreDurableSubscriber(DurableSubscriptionProxy p) 531 { 532 log.info("restoring durable subscriber " + p.getName()); 533 534 router.addKnownNode(p); 536 router.addRoute(new RegexpSourceSpec(RegexpHelper.xlat(p.getSubscription())), 537 p); 538 } 539 540 543 private DurableSubscriptionProxy createDurableSubscriber(DurableConnectionArbiter a, 544 String name, 545 String displayName, 546 String subscription) 547 throws FileNotFoundException, IOException 548 549 { 550 DurableSubscriptionProxy p = getDurableSubscriber(name); 553 554 if (p == null) 556 { 557 p = new DurableSubscriptionProxy(factories, a, name, displayName, subscription); 558 addDurableSubscriber(p); 559 restoreDurableSubscriber(p); 560 } 561 562 return p; 563 } 564 565 568 private DurableSubscriptionProxy getDurableSubscriber(String name) 569 { 570 return (DurableSubscriptionProxy)durableSubscribers.get(name); 571 } 572 573 577 private void unsubscribeDurableSubscriber(String name) 578 { 579 DurableSubscriptionProxy proxy = getDurableSubscriber(name); 580 if (proxy != null) 581 { 582 unregisterDurableSubscriber(proxy); 586 removeDurableSubscriber(proxy); 587 588 try 589 { 590 proxy.close(); 591 } 592 finally 593 { 594 proxy.unsubscribe(); 595 } 596 } 597 598 } 599 600 604 private void registerDurableSubscriber(DurableSubscriptionProxy proxy, 605 IConnectionInfo conn) 606 { 607 ackRouter.addKnownNode(proxy); 609 ackRouter.addRoute(new StaticSourceSpec(conn.getId()), 610 proxy); 611 } 612 613 private void unregisterDurableSubscriber(DurableSubscriptionProxy proxy) 614 { 615 ackRouter.removeKnownNode(proxy); 616 } 617 618 620 public void close() 621 { 622 } 624 625 631 public IRouterStatistics getRouterStatistics() 632 { 633 return this; 634 } 635 636 641 public Collection getConnections() 642 { 643 ArrayList al = new ArrayList(); 644 645 Iterator iter = this.router.getKnownNodes().iterator(); 646 while (iter.hasNext()) 647 { 648 RouteDestNode rdn = (RouteDestNode)iter.next(); 649 if (rdn instanceof ConnectionDestNode) 650 al.add(new MyConnectionAdmin((ConnectionDestNode)rdn)); 651 } 652 653 return al; 654 } 655 656 660 public String getServerName() 661 { 662 return this.router.getNodeLabel(); 663 } 664 665 672 public Collection getPersistentConsumers() throws java.rmi.RemoteException 673 { 674 ArrayList al = new ArrayList(); 675 676 Iterator iter = durableSubscribers.values().iterator(); 677 while (iter.hasNext()) 678 { 679 DurableSubscriptionProxy p = (DurableSubscriptionProxy)iter.next(); 680 al.add(new MyDurableAdmin(p)); 681 } 682 683 return al; 684 } 685 686 private class MyDurableAdmin 687 implements PersistentConsumerAdmin 688 { 689 private DurableSubscriptionProxy durable; 690 691 MyDurableAdmin(DurableSubscriptionProxy durable) 692 { 693 this.durable = durable; 694 } 695 696 702 public String getSubscription() 703 { 704 return durable.getSubscription(); 705 } 706 707 715 public String getDescription() throws java.rmi.RemoteException 716 { 717 return durable.getProxyFor().toHtml(); 718 } 719 720 724 public boolean isQueue() 725 { 726 return durable.getName().startsWith(QUEUE_NAME_PREFIX); 727 } 728 729 734 public boolean isActive() 735 { 736 return durable.getProxyFor().isOpen(); 737 } 738 739 746 public void close() throws javax.jms.JMSException 747 { 748 if (isActive()) 749 throw new javax.jms.JMSException ("Cannot delete a durable subscriber that is in use."); 750 751 unsubscribeDurableSubscriber(durable.getName()); 752 } 753 754 760 public String getName() 761 { 762 return durable.getDisplayName(); 763 } 764 } 765 766 private class MyConnectionAdmin 767 implements ConnectionAdmin 768 { 769 private ConnectionDestNode cdn; 770 771 MyConnectionAdmin(ConnectionDestNode conn) 772 { 773 this.cdn = conn; 774 } 775 776 public String getName() 777 { 778 return cdn.getDisplayName(); 779 } 780 781 public boolean equals(Object o) 782 { 783 try 784 { 785 if (o instanceof ConnectionAdmin) 786 return ((ConnectionAdmin)o).getName().equals(getName()); 787 else return false; 788 } 789 catch (java.rmi.RemoteException e) 790 { 791 return false; 792 } 793 } 794 795 803 public Collection getSubscriptions() 804 { 805 ArrayList l = new ArrayList(); 806 807 Iterator iter = router.getRoutesTo(cdn).iterator(); 808 while (iter.hasNext()) 809 { 810 SourceSpec ss = (SourceSpec)iter.next(); 811 l.add(ss.getDisplayName()); 812 } 813 814 return l; 815 } 816 817 821 public void close() 822 { 823 remove(cdn.getConnection()); 824 } 825 826 829 public void start() 830 { 831 router.addKnownNode(cdn); 832 } 833 834 838 public void stop() 839 { 840 router.removeKnownNode(cdn); 841 } 842 843 850 public boolean isActive() 851 { 852 boolean a = router.getKnownNodes().contains(cdn); 853 return a; 854 } 855 } 856 857 public void messageIn() 859 { 860 nMessagesIn++; 861 } 862 863 public void messageOut() 864 { 865 nMessagesOut++; 866 } 867 868 public void messageDropped() 869 { 870 nMessagesDropped++; 871 } 872 873 public boolean isRunning() 874 { 875 return true; 876 } 877 878 public Date getStartTime() 879 { 880 return new Date(startTime); 881 } 882 883 public int getMessagesDropped() 884 { 885 return nMessagesDropped; 886 } 887 888 public int getMessagesIn() 889 { 890 return nMessagesIn; 891 } 892 893 public int getMessagesOut() 894 { 895 return nMessagesOut; 896 } 897 898 public String getStatisticsAsText() 899 { 900 java.text.NumberFormat nf = java.text.NumberFormat.getNumberInstance(); 901 nf.setMaximumFractionDigits(1); 902 nf.setMinimumFractionDigits(0); 903 nf.setGroupingUsed(true); 904 905 java.text.NumberFormat pf = java.text.NumberFormat.getPercentInstance(); 906 pf.setMaximumFractionDigits(1); 907 pf.setMinimumFractionDigits(1); 908 909 long time = System.currentTimeMillis(); 910 long uptime = (long)((time - startTime) / 1000F); 911 912 StringBuffer sb = new StringBuffer (); 913 sb.append("\n\tMessages In: " + nf.format(nMessagesIn)); 914 sb.append("\n\tMessages Out: " + nf.format(nMessagesOut)); 915 sb.append("\n\tMessages Drop: " + nf.format(nMessagesDropped)); 916 917 if (nMessagesIn > 0) 918 { 919 sb.append("\n"); 920 sb.append("\n\tLatency: " + nf.format(accumLatency / ((double)nMessagesIn)) + " ms"); 921 sb.append("\n\tThroughput: " + nf.format(1000 * ((double)nMessagesIn) / accumLatency) + " msg/s"); 922 sb.append("\n\tUtilization: " + pf.format( accumLatency / (double)(time - startTime) )); 923 } 924 925 sb.append("\n\n\tUptime: " + formatMillis(uptime)); 926 927 return sb.toString(); 928 } 929 930 private String formatMillis(long ms) 931 { 932 java.text.NumberFormat tf = java.text.NumberFormat.getNumberInstance(); 933 tf.setMinimumIntegerDigits(2); 934 tf.setMaximumFractionDigits(3); 935 936 return tf.format( ms / 3600 ) + "h " + tf.format( (ms % 3600) / 60 ) + "m"; 937 } 938 939 } 940 | Popular Tags |