1 22 package org.jboss.mq.server; 23 24 import java.util.Collection ; 25 import java.util.HashMap ; 26 import java.util.Iterator ; 27 import java.util.Map ; 28 import java.util.TreeMap ; 29 30 import javax.jms.Destination ; 31 import javax.jms.InvalidDestinationException ; 32 import javax.jms.JMSException ; 33 import javax.jms.Queue ; 34 import javax.jms.TemporaryQueue ; 35 import javax.jms.TemporaryTopic ; 36 import javax.jms.Topic ; 37 import javax.transaction.xa.Xid ; 38 39 import org.jboss.mq.AcknowledgementRequest; 40 import org.jboss.mq.ConnectionToken; 41 import org.jboss.mq.DurableSubscriptionID; 42 import org.jboss.mq.SpyDestination; 43 import org.jboss.mq.SpyJMSException; 44 import org.jboss.mq.SpyMessage; 45 import org.jboss.mq.SpyQueue; 46 import org.jboss.mq.SpyTemporaryQueue; 47 import org.jboss.mq.SpyTemporaryTopic; 48 import org.jboss.mq.SpyTopic; 49 import org.jboss.mq.SpyTransactionRolledBackException; 50 import org.jboss.mq.Subscription; 51 import org.jboss.mq.TransactionRequest; 52 import org.jboss.mq.pm.PersistenceManager; 53 import org.jboss.mq.pm.Tx; 54 import org.jboss.mq.pm.TxManager; 55 import org.jboss.mq.sm.StateManager; 56 import org.jboss.util.threadpool.ThreadPool; 57 import org.jboss.util.timeout.TimeoutFactory; 58 59 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; 60 61 70 public class JMSDestinationManager extends JMSServerInterceptorSupport 71 { 72 73 public final static String JBOSS_VERSION = "JBossMQ Version 4.0"; 74 75 76 public Map destinations = new ConcurrentReaderHashMap(); 77 78 79 public Map closingDestinations = new ConcurrentReaderHashMap(); 80 81 82 public ThreadPool threadPool; 83 84 85 public ThreadGroup threadGroup; 86 87 88 public TimeoutFactory timeoutFactory; 89 90 91 Map clientConsumers = new ConcurrentReaderHashMap(); 92 93 94 private int lastID = 1; 95 96 97 private int lastTemporaryTopic = 1; 98 99 private Object lastTemporaryTopicLock = new Object (); 100 101 102 private int lastTemporaryQueue = 1; 103 104 private Object lastTemporaryQueueLock = new Object (); 105 106 107 private StateManager stateManager; 108 109 110 private PersistenceManager persistenceManager; 111 112 113 private MessageCache messageCache; 114 115 private Object stateLock = new Object (); 116 117 private Object idLock = new Object (); 118 119 124 private boolean stopped = true; 125 126 127 BasicQueueParameters parameters; 128 129 132 public JMSDestinationManager(BasicQueueParameters parameters) 133 { 134 this.parameters = parameters; 135 } 136 137 144 public void setEnabled(ConnectionToken dc, boolean enabled) throws JMSException 145 { 146 ClientConsumer ClientConsumer = getClientConsumer(dc); 147 ClientConsumer.setEnabled(enabled); 148 } 149 150 155 public void setStateManager(StateManager newStateManager) 156 { 157 stateManager = newStateManager; 158 } 159 160 165 public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager newPersistenceManager) 166 { 167 persistenceManager = newPersistenceManager; 168 } 169 170 177 public boolean isStopped() 178 { 179 synchronized (stateLock) 180 { 181 return this.stopped; 182 } 183 } 184 185 protected void checkStopped() throws IllegalStateException 186 { 187 if (isStopped()) 188 throw new IllegalStateException ("Server is stopped."); 189 } 190 191 195 public int getClientCount() 196 { 197 return clientConsumers.size(); 198 } 199 200 205 public HashMap getClients() 206 { 207 return new HashMap (clientConsumers); 208 } 209 210 public void setThreadPool(ThreadPool threadPool) 211 { 212 this.threadPool = threadPool; 213 } 214 215 public ThreadPool getThreadPool() 216 { 217 return threadPool; 218 } 219 220 public void setThreadGroup(ThreadGroup threadGroup) 221 { 222 this.threadGroup = threadGroup; 223 } 224 225 public ThreadGroup getThreadGroup() 226 { 227 return threadGroup; 228 } 229 230 public TimeoutFactory getTimeoutFactory() 231 { 232 return timeoutFactory; 233 } 234 235 240 public String getID() 241 { 242 String ID = null; 243 244 while (isStopped() == false) 245 { 246 if (stateManager == null) 247 throw new IllegalStateException ("No statemanager"); 248 try 249 { 250 synchronized (idLock) 251 { 252 ID = "ID:" + (new Integer (lastID++).toString()); 253 } 254 stateManager.addLoggedOnClientId(ID); 255 break; 256 } 257 catch (Exception e) 258 { 259 } 260 } 261 262 checkStopped(); 263 264 return ID; 265 } 266 267 public TemporaryTopic getTemporaryTopic(ConnectionToken dc) throws JMSException 268 { 269 checkStopped(); 270 271 String topicName; 272 synchronized (lastTemporaryTopicLock) 273 { 274 topicName = "JMS_TT" + (new Integer (lastTemporaryTopic++).toString()); 275 } 276 SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, dc); 277 278 ClientConsumer ClientConsumer = getClientConsumer(dc); 279 JMSDestination queue = new JMSTopic(topic, ClientConsumer, this, parameters); 280 destinations.put(topic, queue); 281 282 return topic; 283 } 284 285 public TemporaryQueue getTemporaryQueue(ConnectionToken dc) throws JMSException 286 { 287 checkStopped(); 288 289 String queueName; 290 synchronized (lastTemporaryQueueLock) 291 { 292 queueName = "JMS_TQ" + (new Integer (lastTemporaryQueue++).toString()); 293 } 294 SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName, dc); 295 296 ClientConsumer ClientConsumer = getClientConsumer(dc); 297 JMSDestination queue = new JMSQueue(newQueue, ClientConsumer, this, parameters); 298 destinations.put(newQueue, queue); 299 300 return newQueue; 301 } 302 303 public ClientConsumer getClientConsumer(ConnectionToken dc) throws JMSException 304 { 305 ClientConsumer cq = (ClientConsumer) clientConsumers.get(dc); 306 if (cq == null) 307 { 308 cq = new ClientConsumer(this, dc); 309 clientConsumers.put(dc, cq); 310 } 311 return cq; 312 } 313 314 public JMSDestination getJMSDestination(SpyDestination dest) 315 { 316 return (JMSDestination) destinations.get(dest); 317 } 318 319 326 protected JMSDestination getPossiblyClosingJMSDestination(SpyDestination dest) 327 { 328 JMSDestination result = (JMSDestination) destinations.get(dest); 329 if (result == null) 330 result = (JMSDestination) closingDestinations.get(dest); 331 return result; 332 } 333 334 339 public StateManager getStateManager() 340 { 341 return stateManager; 342 } 343 344 349 public PersistenceManager getPersistenceManager() 350 { 351 return persistenceManager; 352 } 353 354 357 public void startServer() 358 { 359 synchronized (stateLock) 360 { 361 this.stopped = false; 362 this.timeoutFactory = new TimeoutFactory(this.threadPool); 363 } 364 } 365 366 369 public void stopServer() 370 { 371 synchronized (stateLock) 372 { 373 this.stopped = true; 374 this.timeoutFactory.cancel(); 375 376 for (Iterator i = clientConsumers.keySet().iterator(); i.hasNext();) 377 { 378 ConnectionToken token = (ConnectionToken) i.next(); 379 try 380 { 381 connectionClosing(token); 382 } 383 catch (Throwable t) 384 { 385 log.trace("Ignored error closing client connection " + token, t); 386 } 387 } 388 } 389 } 390 391 public void checkID(String ID) throws JMSException 392 { 393 checkStopped(); 394 stateManager.addLoggedOnClientId(ID); 395 } 396 397 public void addMessage(ConnectionToken dc, SpyMessage val) throws JMSException 398 { 399 addMessage(dc, val, null); 400 } 401 402 public void addMessage(ConnectionToken dc, SpyMessage val, Tx txId) throws JMSException 403 { 404 checkStopped(); 405 JMSDestination queue = (JMSDestination) destinations.get(val.getJMSDestination()); 406 if (queue == null) 407 throw new InvalidDestinationException ("This destination does not exist! " + val.getJMSDestination()); 408 409 val.setJMSRedelivered(false); 411 val.header.jmsProperties.remove(SpyMessage.PROPERTY_REDELIVERY_COUNT); 412 413 val.setReadOnlyMode(); 415 queue.addMessage(val, txId); 416 } 417 418 public void transact(ConnectionToken dc, TransactionRequest t) throws JMSException 419 { 420 checkStopped(); 421 boolean trace = log.isTraceEnabled(); 422 TxManager txManager = persistenceManager.getTxManager(); 423 if (t.requestType == TransactionRequest.ONE_PHASE_COMMIT_REQUEST) 424 { 425 Tx txId = txManager.createTx(); 426 if (trace) 427 log.trace(dc + " 1PC " + t.xid + " txId=" + txId.longValue()); 428 try 429 { 430 if (t.messages != null) 431 { 432 for (int i = 0; i < t.messages.length; i++) 433 { 434 addMessage(dc, t.messages[i], txId); 435 } 436 } 437 if (t.acks != null) 438 { 439 for (int i = 0; i < t.acks.length; i++) 440 { 441 acknowledge(dc, t.acks[i], txId); 442 } 443 } 444 txManager.commitTx(txId); 445 } 446 catch (JMSException e) 447 { 448 log.debug("Exception occured, rolling back transaction: ", e); 449 txManager.rollbackTx(txId); 450 throw new SpyTransactionRolledBackException("Transaction was rolled back.", e); 451 } 452 } 453 else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST) 454 { 455 Tx txId = txManager.createTx(dc, t.xid); 456 if (trace) 457 log.trace(dc + " 2PC PREPARE " + t.xid + " txId=" + txId.longValue()); 458 try 459 { 460 if (t.messages != null) 461 { 462 for (int i = 0; i < t.messages.length; i++) 463 { 464 addMessage(dc, t.messages[i], txId); 465 } 466 } 467 if (t.acks != null) 468 { 469 for (int i = 0; i < t.acks.length; i++) 470 { 471 acknowledge(dc, t.acks[i], txId); 472 } 473 } 474 475 txManager.markPrepared(dc, t.xid, txId); 476 } 477 catch (JMSException e) 478 { 479 log.debug("Exception occured, rolling back transaction: ", e); 480 txManager.rollbackTx(txId); 481 throw new SpyTransactionRolledBackException("Transaction was rolled back.", e); 482 } 483 } 484 else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST) 485 { 486 if (trace) 487 log.trace(dc + " 2PC ROLLBACK " + t.xid); 488 txManager.rollbackTx(dc, t.xid); 489 } 490 else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST) 491 { 492 if (trace) 493 log.trace(dc + " 2PC COMMIT " + t.xid); 494 txManager.commitTx(dc, t.xid); 495 } 496 } 497 498 public Xid [] recover(ConnectionToken dc, int flags) throws Exception 499 { 500 checkStopped(); 501 TxManager txManager = persistenceManager.getTxManager(); 502 return txManager.recover(dc, flags); 503 } 504 505 public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) throws JMSException 506 { 507 acknowledge(dc, item, null); 508 } 509 510 public void acknowledge(ConnectionToken dc, AcknowledgementRequest item, Tx txId) throws JMSException 511 { 512 checkStopped(); 513 ClientConsumer cc = getClientConsumer(dc); 514 cc.acknowledge(item, txId); 515 } 516 517 public void connectionClosing(ConnectionToken dc) throws JMSException 518 { 519 if (dc == null) 520 return; 521 522 ClientConsumer cq = (ClientConsumer) clientConsumers.remove(dc); 524 if (cq != null) 525 cq.close(); 526 527 if (dc.getClientID() != null) 529 stateManager.removeLoggedOnClientId(dc.getClientID()); 530 531 Iterator i = destinations.entrySet().iterator(); 533 while (i.hasNext()) 534 { 535 Map.Entry entry = (Map.Entry ) i.next(); 536 JMSDestination sq = (JMSDestination) entry.getValue(); 537 if (sq != null) 538 { 539 ClientConsumer cc = sq.temporaryDestination; 540 if (cc != null && dc.equals(cc.connectionToken)) 541 { 542 i.remove(); 543 deleteTemporaryDestination(dc, sq); 544 } 545 } 546 } 547 try 549 { 550 if (dc.clientIL != null) 551 dc.clientIL.close(); 552 } 553 catch (Exception ex) 554 { 555 } 559 } 560 561 public void connectionFailure(ConnectionToken dc) throws JMSException 562 { 563 log.error("The connection to client " + dc.getClientID() + " failed."); 565 connectionClosing(dc); 566 } 567 568 public void subscribe(ConnectionToken dc, Subscription sub) throws JMSException 569 { 570 checkStopped(); 571 ClientConsumer clientConsumer = getClientConsumer(dc); 572 clientConsumer.addSubscription(sub); 573 } 574 575 public void unsubscribe(ConnectionToken dc, int subscriptionId) throws JMSException 576 { 577 checkStopped(); 578 ClientConsumer clientConsumer = getClientConsumer(dc); 579 clientConsumer.removeSubscription(subscriptionId); 580 } 581 582 public void destroySubscription(ConnectionToken dc, DurableSubscriptionID id) throws JMSException 583 { 584 checkStopped(); 585 getStateManager().setDurableSubscription(this, id, null); 586 } 587 588 public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector) throws JMSException 589 { 590 checkStopped(); 591 JMSDestination queue = (JMSDestination) destinations.get(dest); 592 if (queue == null) 593 throw new InvalidDestinationException ("That destination does not exist! " + dest); 594 if (!(queue instanceof JMSQueue)) 595 throw new JMSException ("That destination is not a queue"); 596 597 return ((JMSQueue) queue).browse(selector); 598 } 599 600 public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) throws JMSException 601 { 602 checkStopped(); 603 ClientConsumer clientConsumer = getClientConsumer(dc); 604 SpyMessage msg = clientConsumer.receive(subscriberId, wait); 605 return msg; 606 } 607 608 public Queue createQueue(ConnectionToken dc, String name) throws JMSException 609 { 610 checkStopped(); 611 SpyQueue newQueue = new SpyQueue(name); 612 if (!destinations.containsKey(newQueue)) 613 throw new JMSException ("This destination does not exist !" + newQueue); 614 return newQueue; 615 } 616 617 public Topic createTopic(ConnectionToken dc, String name) throws JMSException 618 { 619 checkStopped(); 620 SpyTopic newTopic = new SpyTopic(name); 621 if (!destinations.containsKey(newTopic)) 622 throw new JMSException ("This destination does not exist !" + newTopic); 623 return newTopic; 624 } 625 626 public Queue createQueue(String queueName) throws JMSException 627 { 628 checkStopped(); 629 630 SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName, null); 631 632 JMSDestination queue = new JMSQueue(newQueue, null, this, parameters); 633 destinations.put(newQueue, queue); 634 635 return newQueue; 636 } 637 638 public Topic createTopic(String topicName) throws JMSException 639 { 640 checkStopped(); 641 642 SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, null); 643 644 JMSDestination queue = new JMSTopic(topic, null, this, parameters); 645 destinations.put(topic, queue); 646 647 return topic; 648 } 649 650 public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest) throws JMSException 651 { 652 checkStopped(); 653 JMSDestination destination = (JMSDestination) destinations.get(dest); 654 if (destination == null) 655 throw new InvalidDestinationException ("That destination does not exist! " + destination); 656 657 if (destination.isInUse()) 658 throw new JMSException ("Cannot delete temporary queue, it is in use."); 659 660 destinations.remove(dest); 661 deleteTemporaryDestination(dc, destination); 662 } 663 664 protected void deleteTemporaryDestination(ConnectionToken dc, JMSDestination destination) throws JMSException 665 { 666 try 667 { 668 destination.removeAllMessages(); 669 } 670 catch (Exception e) 671 { 672 log.error("An exception happened while removing all messages from temporary destination " 673 + destination.getSpyDestination().getName(), e); 674 } 675 676 } 677 678 public String checkUser(String userName, String password) throws JMSException 679 { 680 checkStopped(); 681 return stateManager.checkUser(userName, password); 682 } 683 684 public String authenticate(String id, String password) throws JMSException 685 { 686 checkStopped(); 687 return null; 689 } 690 691 public void addDestination(JMSDestination destination) throws JMSException 692 { 693 if (destinations.containsKey(destination.getSpyDestination())) 694 throw new JMSException ("This destination has already been added to the server!"); 695 696 destinations.put(destination.getSpyDestination(), destination); 698 699 if (destination instanceof JMSTopic) 701 { 702 Collection durableSubs = getStateManager().getDurableSubscriptionIdsForTopic((SpyTopic) destination.getSpyDestination()); 703 for (Iterator i = durableSubs.iterator(); i.hasNext();) 704 { 705 DurableSubscriptionID sub = (DurableSubscriptionID) i.next(); 706 log.debug("creating the durable subscription for :" + sub); 707 ((JMSTopic) destination).createDurableSubscription(sub); 708 } 709 } 710 } 711 712 718 public void closeDestination(SpyDestination dest) throws JMSException 719 { 720 JMSDestination destination = (JMSDestination) destinations.remove(dest); 721 if (destination == null) 722 throw new InvalidDestinationException ("This destination is not open! " + dest); 723 724 log.debug("Closing destination " + dest); 725 726 closingDestinations.put(dest, destination); 728 try 729 { 730 destination.close(); 731 } 732 finally 733 { 734 closingDestinations.remove(dest); 735 } 736 } 737 738 public String toString() 739 { 740 return JBOSS_VERSION; 741 } 742 743 public void ping(ConnectionToken dc, long clientTime) throws JMSException 744 { 745 checkStopped(); 746 try 747 { 748 dc.clientIL.pong(System.currentTimeMillis()); 749 } 750 catch (Exception e) 751 { 752 throw new SpyJMSException("Could not pong", e); 753 } 754 } 755 756 760 public MessageCache getMessageCache() 761 { 762 return messageCache; 763 } 764 765 769 public void setMessageCache(MessageCache messageCache) 770 { 771 this.messageCache = messageCache; 772 } 773 774 public SpyTopic getDurableTopic(DurableSubscriptionID sub) throws JMSException 775 { 776 checkStopped(); 777 return getStateManager().getDurableTopic(sub); 778 } 779 780 public Subscription getSubscription(ConnectionToken dc, int subscriberId) throws JMSException 781 { 782 checkStopped(); 783 ClientConsumer clientConsumer = getClientConsumer(dc); 784 return clientConsumer.getSubscription(subscriberId); 785 } 786 787 792 public MessageCounter[] getMessageCounter() 793 { 794 TreeMap map = new TreeMap (); 796 Iterator i = destinations.values().iterator(); 797 798 while (i.hasNext()) 799 { 800 JMSDestination dest = (JMSDestination) i.next(); 801 802 MessageCounter[] counter = dest.getMessageCounter(); 803 804 for (int j = 0; j < counter.length; j++) 805 { 806 String key = counter[j].getDestinationName() + "-" + counter[j].getDestinationSubscription() + "-" 808 + (counter[j].getDestinationTopic() ? "Topic" : "Queue"); 809 810 map.put(key, counter[j]); 811 } 812 } 813 814 return (MessageCounter[]) map.values().toArray(new MessageCounter[0]); 815 } 816 817 820 public void resetMessageCounter() 821 { 822 Iterator i = destinations.values().iterator(); 823 824 while (i.hasNext()) 825 { 826 JMSDestination dest = (JMSDestination) i.next(); 827 828 MessageCounter[] counter = dest.getMessageCounter(); 829 830 for (int j = 0; j < counter.length; j++) 831 { 832 counter[j].resetCounter(); 833 } 834 } 835 } 836 837 public BasicQueueParameters getParameters() 838 { 839 return parameters; 840 } 841 } 842 | Popular Tags |