|                                                                                                              1
 18  package org.apache.activemq.broker.jmx;
 19
 20  import org.apache.activemq.EmbeddedBrokerTestSupport;
 21  import org.apache.activemq.broker.BrokerService;
 22  import org.apache.activemq.broker.region.Topic;
 23
 24  import javax.jms.Connection
  ; 25  import javax.jms.Message
  ; 26  import javax.jms.MessageProducer
  ; 27  import javax.jms.Session
  ; 28  import javax.management.MBeanServer
  ; 29  import javax.management.MBeanServerInvocationHandler
  ; 30  import javax.management.MalformedObjectNameException
  ; 31  import javax.management.ObjectName
  ; 32  import javax.management.openmbean.CompositeData
  ; 33  import javax.management.openmbean.TabularData
  ; 34
 35  import java.io.BufferedReader
  ; 36  import java.io.InputStreamReader
  ; 37
 38  import junit.textui.TestRunner;
 39
 40
 47  public class MBeanTest extends EmbeddedBrokerTestSupport {
 48
 49      private static boolean waitForKeyPress;
 50
 51      protected MBeanServer
  mbeanServer; 52      protected String
  domain = "org.apache.activemq"; 53      protected String
  clientID = "foo"; 54
 55      protected Connection
  connection; 56      protected boolean transacted;
 57      protected int authMode = Session.AUTO_ACKNOWLEDGE;
 58      protected int messageCount = 10;
 59
 60
 64      public static void main(String
  [] args) { 65          waitForKeyPress = true;
 66          TestRunner.run(MBeanTest.class);
 67      }
 68
 69      public void testMBeans() throws Exception
  { 70          connection = connectionFactory.createConnection();
 71          useConnection(connection);
 72
 73                          assertQueueBrowseWorks();
 76          assertCreateAndDestroyDurableSubscriptions();
 77          assertConsumerCounts();
 78      }
 79
 80      public void testMoveMessagesBySelector() throws Exception
  { 81          connection = connectionFactory.createConnection();
 82          useConnection(connection);
 83
 84          ObjectName
  queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); 85
 86          QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 87
 88          String
  newDestination = "test.new.destination." + getClass() + "." + getName(); 89          queue.moveMatchingMessagesTo("counter > 2", newDestination );
 90
 91          queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
 92
 93          queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 94
 95          assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0);
 96
 97                  queue.removeMatchingMessages("counter > 2");
 99
 100         assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
 101     }
 102
 103     public void testCopyMessagesBySelector() throws Exception
  { 104         connection = connectionFactory.createConnection();
 105         useConnection(connection);
 106
 107         ObjectName
  queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); 108
 109         QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 110
 111         String
  newDestination = "test.new.destination." + getClass() + "." + getName(); 112         long queueSize = queue.getQueueSize();
 113         queue.copyMatchingMessagesTo("counter > 2", newDestination);
 114
 115         assertEquals("Should have same number of messages in the queue: " + queueViewMBeanName, queueSize, queueSize);
 116
 117         queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
 118
 119         queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 120
 121         log.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
 122
 123         assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0);
 124
 125                 queue.removeMatchingMessages("counter > 2");
 127
 128         assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
 129     }
 130
 131
 132     protected void assertQueueBrowseWorks() throws Exception
  { 133         Integer
  mbeancnt = mbeanServer.getMBeanCount(); 134         echo("Mbean count :" + mbeancnt);
 135
 136         ObjectName
  queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); 137
 138         echo("Create QueueView MBean...");
 139         QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 140
 141         long concount = proxy.getConsumerCount();
 142         echo("Consumer Count :" + concount);
 143         long messcount = proxy.getQueueSize();
 144         echo("current number of messages in the queue :" + messcount);
 145
 146                 CompositeData
  [] compdatalist = proxy.browse(); 148         if (compdatalist.length == 0) {
 149             fail("There is no message in the queue:");
 150         }
 151         String
  [] messageIDs = new String  [compdatalist.length]; 152
 153         for (int i = 0; i < compdatalist.length; i++) {
 154             CompositeData
  cdata = compdatalist[i]; 155
 156             if (i == 0) {
 157                 echo("Columns: " + cdata.getCompositeType().keySet());
 158             }
 159             messageIDs[i] = (String
  ) cdata.get("JMSMessageID"); 160             echo("message " + i + " : " + cdata.values());
 161         }
 162
 163         TabularData
  table = proxy.browseAsTable(); 164         echo("Found tabular data: " + table);
 165         assertTrue("Table should not be empty!", table.size() > 0);
 166
 167         assertEquals("Queue size", 10, proxy.getQueueSize());
 168
 169         String
  messageID = messageIDs[0]; 170         String
  newDestinationName = "queue://dummy.test.cheese"; 171         echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName);
 172         proxy.copyMessageTo(messageID, newDestinationName);
 173
 174         assertEquals("Queue size", 10, proxy.getQueueSize());
 175
 176         messageID = messageIDs[1];
 177         echo("Attempting to remove: " + messageID);
 178         proxy.removeMessage(messageID);
 179
 180         assertEquals("Queue size", 9, proxy.getQueueSize());
 181
 182         echo("Worked!");
 183     }
 184
 185     protected void assertCreateAndDestroyDurableSubscriptions() throws Exception
  { 186                 ObjectName
  brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); 188         echo("Create QueueView MBean...");
 189         BrokerViewMBean broker = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
 190
 191         broker.addTopic(getDestinationString());
 192
 193         assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);
 194
 195         String
  topicName = getDestinationString(); 196         String
  selector = null; 197         ObjectName
  name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName , selector); 198         broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
 199         assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length);
 200
 201         assertNotNull("Should have created an mbean name for the durable subscriber!", name1);
 202
 203         log.info("Created durable subscriber with name: "  + name1);
 204
 205                 broker.destroyDurableSubscriber(clientID, "subscriber1");
 207         assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
 208     }
 209
 210     protected void assertConsumerCounts() throws Exception
  { 211         ObjectName
  brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); 212         BrokerViewMBean broker = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
 213
 214                 broker.addTopic(getDestinationString() + "1");
 216         broker.addTopic(getDestinationString() + "2");
 217
 218         ObjectName
  topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "1"); 219         ObjectName
  topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "2"); 220         TopicViewMBean topic1 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
 221         TopicViewMBean topic2 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
 222
 223         assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
 224         assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
 225
 226         String
  topicName = getDestinationString(); 227         String
  selector = null; 228
 229                 broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector);
 231         broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector);
 232
 233         assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
 234         assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
 235
 236                 broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector);
 238
 239         assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount());
 240         assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
 241
 242                 broker.destroyDurableSubscriber(clientID, "topic1.subscriber1");
 244
 245         assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
 246         assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
 247
 248                 broker.destroyDurableSubscriber(clientID, "topic2.subscriber1");
 250
 251         assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
 252         assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
 253
 254                 broker.destroyDurableSubscriber(clientID, "topic1.subscriber2");
 256
 257         assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
 258         assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
 259     }
 260
 261     protected ObjectName
  assertRegisteredObjectName(String  name) throws MalformedObjectNameException  , NullPointerException  { 262         ObjectName
  objectName = new ObjectName  (name); 263         if (mbeanServer.isRegistered(objectName)) {
 264             echo("Bean Registered: " + objectName);
 265         }
 266         else {
 267             fail("Could not find MBean!: " + objectName);
 268         }
 269         return objectName;
 270     }
 271
 272     protected void setUp() throws Exception
  { 273         bindAddress = "tcp://localhost:61616";
 274         useTopic = false;
 275         super.setUp();
 276         mbeanServer = broker.getManagementContext().getMBeanServer();
 277     }
 278
 279     protected void tearDown() throws Exception
  { 280         if (waitForKeyPress) {
 281                                     System.out.println();
 284             System.out.println("Press enter to terminate the program.");
 285             System.out.println("In the meantime you can use your JMX console to view the current MBeans");
 286             BufferedReader
  reader = new BufferedReader  (new InputStreamReader  (System.in)); 287             reader.readLine();
 288         }
 289
 290         if (connection != null) {
 291             connection.close();
 292             connection = null;
 293         }
 294         super.tearDown();
 295     }
 296
 297     protected BrokerService createBroker() throws Exception
  { 298         BrokerService answer = new BrokerService();
 299         answer.setUseJmx(true);
 300         answer.setEnableStatistics(true);
 301         answer.setPersistent(false);
 302         answer.addConnector(bindAddress);
 303         return answer;
 304     }
 305
 306     protected void useConnection(Connection
  connection) throws Exception  { 307         connection.setClientID(clientID);
 308         connection.start();
 309         Session
  session = connection.createSession(transacted, authMode); 310         destination = createDestination();
 311         MessageProducer
  producer = session.createProducer(destination); 312         for (int i = 0; i < messageCount; i++) {
 313             Message
  message = session.createTextMessage("Message: " + i); 314             message.setIntProperty("counter", i);
 315             producer.send(message);
 316         }
 317         Thread.sleep(1000);
 318     }
 319
 320     protected void echo(String
  text) { 321         log.info(text);
 322     }
 323 }
 324
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |