1 14 package org.apache.activemq.perf; 15 16 import java.io.File ; 17 import java.net.URI ; 18 import javax.jms.Connection ; 19 import javax.jms.JMSException ; 20 import javax.jms.MapMessage ; 21 import javax.jms.Message ; 22 import javax.jms.MessageProducer ; 23 import javax.jms.Session ; 24 import javax.jms.Topic ; 25 import javax.jms.TopicSubscriber ; 26 import junit.framework.AssertionFailedError; 27 import junit.framework.TestCase; 28 import org.apache.activemq.ActiveMQConnectionFactory; 29 import org.apache.activemq.broker.BrokerFactory; 30 import org.apache.activemq.broker.BrokerService; 31 import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory; 32 import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; 33 36 public class InactiveDurableTopicTest extends TestCase{ 37 private static final int MESSAGE_COUNT = 100000; 38 private static final String DEFAULT_PASSWORD=""; 39 private static final String USERNAME="testuser"; 40 private static final String CLIENTID="mytestclient"; 41 private static final String TOPIC_NAME="testevent"; 42 private static final String SUBID="subscription1"; 43 private static final int deliveryMode=javax.jms.DeliveryMode.PERSISTENT; 44 private static final int deliveryPriority=javax.jms.Message.DEFAULT_PRIORITY; 45 private Connection connection=null; 46 private MessageProducer publisher=null; 47 private TopicSubscriber subscriber=null; 48 private Topic topic=null; 49 private Session session=null; 50 ActiveMQConnectionFactory connectionFactory=null; 51 BrokerService broker; 52 53 protected void setUp() throws Exception { 54 super.setUp(); 55 broker=new BrokerService(); 56 57 broker.setPersistenceAdapter(new KahaPersistenceAdapter()); 58 65 broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_URL); 66 broker.start(); 67 connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL); 68 72 connectionFactory.setUseAsyncSend(true); 73 } 74 75 protected void tearDown() throws Exception { 76 super.tearDown(); 77 broker.stop(); 78 } 79 80 public void test1CreateSubscription() throws Exception { 81 try{ 82 85 connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD); 86 assertNotNull(connection); 87 connection.setClientID(CLIENTID); 88 connection.start(); 89 session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE); 90 assertNotNull(session); 91 topic=session.createTopic(TOPIC_NAME); 92 assertNotNull(topic); 93 subscriber=session.createDurableSubscriber(topic,SUBID,"",false); 94 assertNotNull(subscriber); 95 subscriber.close(); 96 session.close(); 97 connection.close(); 98 }catch(JMSException ex){ 99 try{ 100 connection.close(); 101 }catch(Exception ignore){} 102 throw new AssertionFailedError("Create Subscription caught: "+ex); 103 } 104 } 105 106 public void test2ProducerTestCase(){ 107 111 try{ 112 connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD); 113 assertNotNull(connection); 114 session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE); 115 assertNotNull(session); 116 topic=session.createTopic(TOPIC_NAME); 117 assertNotNull(topic); 118 publisher=session.createProducer(topic); 119 assertNotNull(publisher); 120 MapMessage msg=session.createMapMessage(); 121 assertNotNull(msg); 122 msg.setString("key1","value1"); 123 int loop; 124 for(loop=0;loop<MESSAGE_COUNT;loop++){ 125 msg.setInt("key2",loop); 126 publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE); 127 if (loop%500==0){ 128 System.out.println("Sent " + loop + " messages"); 129 } 130 } 131 this.assertEquals(loop,MESSAGE_COUNT); 132 publisher.close(); 133 session.close(); 134 connection.stop(); 135 connection.stop(); 136 }catch(JMSException ex){ 137 try{ 138 connection.close(); 139 }catch(Exception ignore){} 140 throw new AssertionFailedError("Create Subscription caught: "+ex); 141 } 142 } 143 public void test3CreateSubscription() throws Exception { 144 try{ 145 148 connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD); 149 assertNotNull(connection); 150 connection.setClientID(CLIENTID); 151 connection.start(); 152 session=connection.createSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE); 153 assertNotNull(session); 154 topic=session.createTopic(TOPIC_NAME); 155 assertNotNull(topic); 156 subscriber=session.createDurableSubscriber(topic,SUBID,"",false); 157 assertNotNull(subscriber); 158 int loop; 159 for(loop=0;loop<MESSAGE_COUNT;loop++){ 160 Message msg = subscriber.receive(); 161 if (loop%500==0){ 162 System.out.println("Received " + loop + " messages"); 163 } 164 } 165 this.assertEquals(loop,MESSAGE_COUNT); 166 subscriber.close(); 167 session.close(); 168 connection.close(); 169 }catch(JMSException ex){ 170 try{ 171 connection.close(); 172 }catch(Exception ignore){} 173 throw new AssertionFailedError("Create Subscription caught: "+ex); 174 } 175 } 176 } 177 | Popular Tags |