1 22 package org.jboss.test.jbossmq.test; 23 24 import javax.jms.Message ; 25 import javax.jms.Session ; 26 import javax.jms.Topic ; 27 import javax.jms.TopicConnection ; 28 import javax.jms.TopicConnectionFactory ; 29 import javax.jms.TopicPublisher ; 30 import javax.jms.TopicSession ; 31 import javax.management.MBeanServerConnection ; 32 import javax.management.ObjectName ; 33 import javax.naming.Context ; 34 35 import org.jboss.mx.util.ObjectNameFactory; 36 import org.jboss.test.JBossTestCase; 37 38 44 public class CleanTopicRemovalUnitTestCase extends JBossTestCase 45 { 46 static String TOPIC_FACTORY = "ConnectionFactory"; 47 static ObjectName destinationManager = ObjectNameFactory.create("jboss.mq:service=DestinationManager"); 48 static ObjectName messageCache = ObjectNameFactory.create("jboss.mq:service=MessageCache"); 49 50 TopicConnection topicConnection; 51 Topic topic; 52 53 public CleanTopicRemovalUnitTestCase(String name) throws Exception 54 { 55 super(name); 56 } 57 58 public void testCleanTopicRemoval() throws Throwable 59 { 60 createTopic(); 61 try 62 { 63 connect(); 64 try 65 { 66 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 67 TopicPublisher publisher = session.createPublisher(topic); 68 session.createSubscriber(topic); 69 topicConnection.start(); 70 71 getLog().debug("Publish message"); 72 Message message = session.createMessage(); 73 publisher.publish(message); 74 75 int beforeCacheCount = getCacheCount(); 76 getLog().debug("beforeCacheCount=" + beforeCacheCount); 77 78 removeTopic(); 79 80 int afterCacheCount = getCacheCount(); 81 getLog().debug("afterCacheCount=" + afterCacheCount); 82 83 assertEquals("Message should be removed ", afterCacheCount, beforeCacheCount - 1); 84 } 85 finally 86 { 87 disconnect(); 88 } 89 } 90 catch (Throwable t) 91 { 92 try 93 { 94 getLog().error("Error ", t); 95 removeTopic(); 96 } 97 catch (Throwable ignored) 98 { 99 } 100 throw t; 101 } 102 } 103 104 protected void connect() throws Exception 105 { 106 Context context = getInitialContext(); 107 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(TOPIC_FACTORY); 108 topicConnection = topicFactory.createTopicConnection(); 109 110 getLog().debug("Connection established."); 111 } 112 113 protected void disconnect() 114 { 115 try 116 { 117 if (topicConnection != null) 118 topicConnection.close(); 119 } 120 catch (Throwable ignored) 121 { 122 getLog().warn("Ignored", ignored); 123 } 124 125 getLog().debug("Connection closed."); 126 } 127 128 protected void createTopic() throws Exception 129 { 130 getLog().debug("Create topic"); 131 MBeanServerConnection server = getServer(); 132 server.invoke(destinationManager, "createTopic", 133 new Object [] 134 { 135 "cleanTopicRemovalTest", 136 "topic/cleanTopicRemovalTest" 137 }, 138 new String [] 139 { 140 String .class.getName(), 141 String .class.getName() 142 } 143 ); 144 Context context = getInitialContext(); 145 topic = (Topic ) context.lookup("topic/cleanTopicRemovalTest"); 146 147 log.debug("Got topic " + topic); 148 } 149 150 protected void removeTopic() throws Exception 151 { 152 getLog().debug("Remove topic"); 153 MBeanServerConnection server = getServer(); 154 server.invoke(destinationManager, "destroyTopic", 155 new Object [] 156 { 157 "cleanTopicRemovalTest", 158 }, 159 new String [] 160 { 161 String .class.getName() 162 } 163 ); 164 } 165 166 protected int getCacheCount() throws Exception 167 { 168 MBeanServerConnection server = getServer(); 169 Integer cacheCount = (Integer ) server.getAttribute(messageCache, "TotalCacheSize"); 170 return cacheCount.intValue(); 171 } 172 } 173 174 | Popular Tags |