1 14 15 package org.apache.activemq.bugs; 16 17 import java.io.File ; 18 import java.util.Properties ; 19 import javax.jms.BytesMessage ; 20 import javax.jms.Connection ; 21 import javax.jms.Destination ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageProducer ; 25 import javax.jms.Session ; 26 import javax.jms.Topic ; 27 import javax.jms.TopicSubscriber ; 28 import org.apache.activemq.ActiveMQConnectionFactory; 29 import org.apache.activemq.broker.BrokerService; 30 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 31 import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; 32 import org.apache.activemq.test.JmsTopicSendReceiveTest; 33 34 37 public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest{ 38 39 private static final org.apache.commons.logging.Log log=org.apache.commons.logging.LogFactory 40 .getLog(JmsDurableTopicSlowReceiveTest.class); 41 protected Connection connection2; 42 protected Session session2; 43 protected Session consumeSession2; 44 protected MessageConsumer consumer2; 45 protected MessageProducer producer2; 46 protected Destination consumerDestination2; 47 BrokerService broker; 48 final int NMSG=100; 49 final int MSIZE=256000; 50 private Connection connection3; 51 private Session consumeSession3; 52 private TopicSubscriber consumer3; 53 private final String countProperyName = "count"; 54 55 60 protected void setUp() throws Exception { 61 this.durable=true; 62 broker=createBroker(); 63 super.setUp(); 64 } 65 66 protected void tearDown() throws Exception { 67 super.tearDown(); 68 broker.stop(); 69 } 70 71 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 72 ActiveMQConnectionFactory result=new ActiveMQConnectionFactory("vm://localhost?async=false"); 73 Properties props=new Properties (); 74 props.put("prefetchPolicy.durableTopicPrefetch","5"); 75 props.put("prefetchPolicy.optimizeDurableTopicPrefetch","5"); 76 result.setProperties(props); 77 return result; 78 } 79 80 protected BrokerService createBroker() throws Exception { 81 BrokerService answer=new BrokerService(); 82 configureBroker(answer); 83 answer.start(); 84 return answer; 85 } 86 87 protected void configureBroker(BrokerService answer) throws Exception { 88 answer.setDeleteAllMessagesOnStartup(true); 89 } 90 91 96 public void testSlowReceiver() throws Exception { 97 connection2=createConnection(); 98 connection2.setClientID("test"); 99 connection2.start(); 100 consumeSession2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE); 101 session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE); 102 consumerDestination2=session2.createTopic(getConsumerSubject()+"2"); 103 consumer2=consumeSession2.createDurableSubscriber((Topic )consumerDestination2,getName()); 104 105 consumer2.close(); 106 connection2.close(); 107 new Thread (new Runnable (){ 108 109 public void run(){ 110 try{ 111 int count = 0; 112 for(int loop=0;loop<4;loop++){ 113 connection2=createConnection(); 114 connection2.start(); 115 session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE); 116 producer2=session2.createProducer(null); 117 producer2.setDeliveryMode(deliveryMode); 118 Thread.sleep(1000); 119 for(int i=0;i<NMSG/4;i++){ 120 BytesMessage message=session2.createBytesMessage(); 121 message.writeBytes(new byte[MSIZE]); 122 message.setStringProperty("test","test"); 123 message.setIntProperty(countProperyName,count); 124 message.setJMSType("test"); 125 producer2.send(consumerDestination2,message); 126 Thread.sleep(50); 127 if(verbose){ 128 System.out.println("Sent("+loop+"): "+i); 129 } 130 count++; 131 } 132 producer2.close(); 133 connection2.stop(); 134 connection2.close(); 135 } 136 }catch(Throwable e){ 137 e.printStackTrace(); 138 } 139 } 140 },"SENDER Thread").start(); 141 connection3=createConnection(); 142 connection3.setClientID("test"); 143 connection3.start(); 144 consumeSession3=connection3.createSession(false,Session.CLIENT_ACKNOWLEDGE); 145 consumer3=consumeSession3.createDurableSubscriber((Topic )consumerDestination2,getName()); 146 connection3.close(); 147 int count =0; 148 for(int loop=0;loop<4;++loop){ 149 connection3=createConnection(); 150 connection3.setClientID("test"); 151 connection3.start(); 152 consumeSession3=connection3.createSession(false,Session.CLIENT_ACKNOWLEDGE); 153 consumer3=consumeSession3.createDurableSubscriber((Topic )consumerDestination2,getName()); 154 Message msg=null; 155 int i; 156 for(i=0;i<NMSG/4;i++){ 157 msg=consumer3.receive(10000); 158 if(msg==null) 159 break; 160 if(verbose) { 161 System.out.println("Received("+loop+"): "+i + " count = " + msg.getIntProperty(countProperyName)); 162 } 163 assertNotNull(msg); 164 assertEquals(msg.getJMSType(),"test"); 165 assertEquals(msg.getStringProperty("test"),"test"); 166 assertEquals("Messages received out of order",count,msg.getIntProperty(countProperyName)); 167 Thread.sleep(500); 168 msg.acknowledge(); 169 count++; 170 } 171 consumer3.close(); 172 assertEquals("Receiver "+loop,NMSG/4,i); 173 connection3.close(); 174 } 175 } 176 } 177 | Popular Tags |