1 18 package org.apache.activemq.broker.jmx; 19 20 import org.apache.activemq.EmbeddedBrokerTestSupport; 21 import org.apache.activemq.broker.BrokerService; 22 import org.apache.activemq.broker.region.Topic; 23 24 import javax.jms.Connection ; 25 import javax.jms.Message ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.Session ; 28 import javax.management.MBeanServer ; 29 import javax.management.MBeanServerInvocationHandler ; 30 import javax.management.MalformedObjectNameException ; 31 import javax.management.ObjectName ; 32 import javax.management.openmbean.CompositeData ; 33 import javax.management.openmbean.TabularData ; 34 35 import java.io.BufferedReader ; 36 import java.io.InputStreamReader ; 37 38 import junit.textui.TestRunner; 39 40 47 public class MBeanTest extends EmbeddedBrokerTestSupport { 48 49 private static boolean waitForKeyPress; 50 51 protected MBeanServer mbeanServer; 52 protected String domain = "org.apache.activemq"; 53 protected String clientID = "foo"; 54 55 protected Connection connection; 56 protected boolean transacted; 57 protected int authMode = Session.AUTO_ACKNOWLEDGE; 58 protected int messageCount = 10; 59 60 64 public static void main(String [] args) { 65 waitForKeyPress = true; 66 TestRunner.run(MBeanTest.class); 67 } 68 69 public void testMBeans() throws Exception { 70 connection = connectionFactory.createConnection(); 71 useConnection(connection); 72 73 assertQueueBrowseWorks(); 76 assertCreateAndDestroyDurableSubscriptions(); 77 assertConsumerCounts(); 78 } 79 80 public void testMoveMessagesBySelector() throws Exception { 81 connection = connectionFactory.createConnection(); 82 useConnection(connection); 83 84 ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); 85 86 QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); 87 88 String newDestination = "test.new.destination." + getClass() + "." + getName(); 89 queue.moveMatchingMessagesTo("counter > 2", newDestination ); 90 91 queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); 92 93 queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); 94 95 assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0); 96 97 queue.removeMatchingMessages("counter > 2"); 99 100 assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); 101 } 102 103 public void testCopyMessagesBySelector() throws Exception { 104 connection = connectionFactory.createConnection(); 105 useConnection(connection); 106 107 ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); 108 109 QueueViewMBean queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); 110 111 String newDestination = "test.new.destination." + getClass() + "." + getName(); 112 long queueSize = queue.getQueueSize(); 113 queue.copyMatchingMessagesTo("counter > 2", newDestination); 114 115 assertEquals("Should have same number of messages in the queue: " + queueViewMBeanName, queueSize, queueSize); 116 117 queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); 118 119 queue = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); 120 121 log.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); 122 123 assertTrue("Should have at least one message in the queue: " + queueViewMBeanName, queue.getQueueSize() > 0); 124 125 queue.removeMatchingMessages("counter > 2"); 127 128 assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); 129 } 130 131 132 protected void assertQueueBrowseWorks() throws Exception { 133 Integer mbeancnt = mbeanServer.getMBeanCount(); 134 echo("Mbean count :" + mbeancnt); 135 136 ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); 137 138 echo("Create QueueView MBean..."); 139 QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); 140 141 long concount = proxy.getConsumerCount(); 142 echo("Consumer Count :" + concount); 143 long messcount = proxy.getQueueSize(); 144 echo("current number of messages in the queue :" + messcount); 145 146 CompositeData [] compdatalist = proxy.browse(); 148 if (compdatalist.length == 0) { 149 fail("There is no message in the queue:"); 150 } 151 String [] messageIDs = new String [compdatalist.length]; 152 153 for (int i = 0; i < compdatalist.length; i++) { 154 CompositeData cdata = compdatalist[i]; 155 156 if (i == 0) { 157 echo("Columns: " + cdata.getCompositeType().keySet()); 158 } 159 messageIDs[i] = (String ) cdata.get("JMSMessageID"); 160 echo("message " + i + " : " + cdata.values()); 161 } 162 163 TabularData table = proxy.browseAsTable(); 164 echo("Found tabular data: " + table); 165 assertTrue("Table should not be empty!", table.size() > 0); 166 167 assertEquals("Queue size", 10, proxy.getQueueSize()); 168 169 String messageID = messageIDs[0]; 170 String newDestinationName = "queue://dummy.test.cheese"; 171 echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName); 172 proxy.copyMessageTo(messageID, newDestinationName); 173 174 assertEquals("Queue size", 10, proxy.getQueueSize()); 175 176 messageID = messageIDs[1]; 177 echo("Attempting to remove: " + messageID); 178 proxy.removeMessage(messageID); 179 180 assertEquals("Queue size", 9, proxy.getQueueSize()); 181 182 echo("Worked!"); 183 } 184 185 protected void assertCreateAndDestroyDurableSubscriptions() throws Exception { 186 ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); 188 echo("Create QueueView MBean..."); 189 BrokerViewMBean broker = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); 190 191 broker.addTopic(getDestinationString()); 192 193 assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length); 194 195 String topicName = getDestinationString(); 196 String selector = null; 197 ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName , selector); 198 broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector); 199 assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length); 200 201 assertNotNull("Should have created an mbean name for the durable subscriber!", name1); 202 203 log.info("Created durable subscriber with name: " + name1); 204 205 broker.destroyDurableSubscriber(clientID, "subscriber1"); 207 assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length); 208 } 209 210 protected void assertConsumerCounts() throws Exception { 211 ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); 212 BrokerViewMBean broker = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); 213 214 broker.addTopic(getDestinationString() + "1"); 216 broker.addTopic(getDestinationString() + "2"); 217 218 ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "1"); 219 ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "2"); 220 TopicViewMBean topic1 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true); 221 TopicViewMBean topic2 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true); 222 223 assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); 224 assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); 225 226 String topicName = getDestinationString(); 227 String selector = null; 228 229 broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector); 231 broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector); 232 233 assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); 234 assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); 235 236 broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector); 238 239 assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount()); 240 assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); 241 242 broker.destroyDurableSubscriber(clientID, "topic1.subscriber1"); 244 245 assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); 246 assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount()); 247 248 broker.destroyDurableSubscriber(clientID, "topic2.subscriber1"); 250 251 assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount()); 252 assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); 253 254 broker.destroyDurableSubscriber(clientID, "topic1.subscriber2"); 256 257 assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount()); 258 assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount()); 259 } 260 261 protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException , NullPointerException { 262 ObjectName objectName = new ObjectName (name); 263 if (mbeanServer.isRegistered(objectName)) { 264 echo("Bean Registered: " + objectName); 265 } 266 else { 267 fail("Could not find MBean!: " + objectName); 268 } 269 return objectName; 270 } 271 272 protected void setUp() throws Exception { 273 bindAddress = "tcp://localhost:61616"; 274 useTopic = false; 275 super.setUp(); 276 mbeanServer = broker.getManagementContext().getMBeanServer(); 277 } 278 279 protected void tearDown() throws Exception { 280 if (waitForKeyPress) { 281 System.out.println(); 284 System.out.println("Press enter to terminate the program."); 285 System.out.println("In the meantime you can use your JMX console to view the current MBeans"); 286 BufferedReader reader = new BufferedReader (new InputStreamReader (System.in)); 287 reader.readLine(); 288 } 289 290 if (connection != null) { 291 connection.close(); 292 connection = null; 293 } 294 super.tearDown(); 295 } 296 297 protected BrokerService createBroker() throws Exception { 298 BrokerService answer = new BrokerService(); 299 answer.setUseJmx(true); 300 answer.setEnableStatistics(true); 301 answer.setPersistent(false); 302 answer.addConnector(bindAddress); 303 return answer; 304 } 305 306 protected void useConnection(Connection connection) throws Exception { 307 connection.setClientID(clientID); 308 connection.start(); 309 Session session = connection.createSession(transacted, authMode); 310 destination = createDestination(); 311 MessageProducer producer = session.createProducer(destination); 312 for (int i = 0; i < messageCount; i++) { 313 Message message = session.createTextMessage("Message: " + i); 314 message.setIntProperty("counter", i); 315 producer.send(message); 316 } 317 Thread.sleep(1000); 318 } 319 320 protected void echo(String text) { 321 log.info(text); 322 } 323 } 324 | Popular Tags |