1 18 package org.apache.activemq.usecases; 19 20 import javax.jms.Connection ; 21 import javax.jms.DeliveryMode ; 22 import javax.jms.Destination ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 import javax.jms.TextMessage ; 28 import javax.jms.Topic ; 29 30 import org.apache.activemq.test.TestSupport; 31 import org.apache.activemq.util.IdGenerator; 32 33 36 public class TopicRedeliverTest extends TestSupport { 37 38 private static final int RECEIVE_TIMEOUT = 10000; 39 private IdGenerator idGen = new IdGenerator(); 40 protected int deliveryMode = DeliveryMode.PERSISTENT; 41 public TopicRedeliverTest(){ 42 } 43 44 public TopicRedeliverTest(String n){ 45 super(n); 46 } 47 48 protected void setup() throws Exception { 49 super.setUp(); 50 topic = true; 51 } 52 53 54 58 public void testClientAcknowledge() throws Exception { 59 Destination destination = createDestination(getClass().getName()); 60 Connection connection = createConnection(); 61 connection.setClientID(idGen.generateId()); 62 connection.start(); 63 Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 64 MessageConsumer consumer = consumerSession.createConsumer(destination); 65 Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 66 MessageProducer producer = producerSession.createProducer(destination); 67 producer.setDeliveryMode(deliveryMode); 68 69 71 TextMessage sent1 = producerSession.createTextMessage(); 72 sent1.setText("msg1"); 73 producer.send(sent1); 74 75 TextMessage sent2 = producerSession.createTextMessage(); 76 sent1.setText("msg2"); 77 producer.send(sent2); 78 79 TextMessage sent3 = producerSession.createTextMessage(); 80 sent1.setText("msg3"); 81 producer.send(sent3); 82 83 Message rec1 = consumer.receive(RECEIVE_TIMEOUT); 84 Message rec2 = consumer.receive(RECEIVE_TIMEOUT); 85 Message rec3 = consumer.receive(RECEIVE_TIMEOUT); 86 87 rec2.acknowledge(); 89 90 TextMessage sent4 = producerSession.createTextMessage(); 91 sent4.setText("msg4"); 92 producer.send(sent4); 93 94 Message rec4 = consumer.receive(RECEIVE_TIMEOUT); 95 assertTrue(rec4.equals(sent4)); 96 consumerSession.recover(); 97 rec4 = consumer.receive(RECEIVE_TIMEOUT); 98 assertTrue(rec4.equals(sent4)); 99 assertTrue(rec4.getJMSRedelivered()); 100 rec4.acknowledge(); 101 connection.close(); 102 103 } 104 105 109 public void testRedilveredFlagSetOnRollback() throws Exception { 110 Destination destination = createDestination(getClass().getName()); 111 Connection connection = createConnection(); 112 connection.setClientID(idGen.generateId()); 113 connection.start(); 114 Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 115 MessageConsumer consumer = null; 116 if (topic){ 117 consumer = consumerSession.createDurableSubscriber((Topic )destination, "TESTRED"); 118 }else{ 119 consumer = consumerSession.createConsumer(destination); 120 } 121 Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 122 MessageProducer producer = producerSession.createProducer(destination); 123 producer.setDeliveryMode(deliveryMode); 124 125 TextMessage sentMsg = producerSession.createTextMessage(); 126 sentMsg.setText("msg1"); 127 producer.send(sentMsg); 128 producerSession.commit(); 129 130 Message recMsg = consumer.receive(RECEIVE_TIMEOUT); 131 assertTrue(recMsg.getJMSRedelivered() == false); 132 recMsg = consumer.receive(RECEIVE_TIMEOUT); 133 consumerSession.rollback(); 134 recMsg = consumer.receive(RECEIVE_TIMEOUT); 135 assertTrue(recMsg.getJMSRedelivered()); 136 consumerSession.commit(); 137 assertTrue(recMsg.equals(sentMsg)); 138 assertTrue(recMsg.getJMSRedelivered()); 139 connection.close(); 140 } 141 142 143 147 148 public void XtestTransactionRollbackOnSessionClose() throws Exception { 149 Destination destination = createDestination(getClass().getName()); 150 Connection connection = createConnection(); 151 connection.setClientID(idGen.generateId()); 152 connection.start(); 153 Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 154 MessageConsumer consumer = null; 155 if (topic){ 156 consumer = consumerSession.createDurableSubscriber((Topic )destination, "TESTRED"); 157 }else{ 158 consumer = consumerSession.createConsumer(destination); 159 } 160 Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 161 MessageProducer producer = producerSession.createProducer(destination); 162 producer.setDeliveryMode(deliveryMode); 163 164 TextMessage sentMsg = producerSession.createTextMessage(); 165 sentMsg.setText("msg1"); 166 producer.send(sentMsg); 167 168 producerSession.commit(); 169 170 Message recMsg = consumer.receive(RECEIVE_TIMEOUT); 171 assertTrue(recMsg.getJMSRedelivered() == false); 172 consumerSession.close(); 173 consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 174 consumer = consumerSession.createConsumer(destination); 175 176 recMsg = consumer.receive(RECEIVE_TIMEOUT); 177 consumerSession.commit(); 178 assertTrue(recMsg.equals(sentMsg)); 179 connection.close(); 180 } 181 182 186 187 public void testTransactionRollbackOnSend() throws Exception { 188 Destination destination = createDestination(getClass().getName()); 189 Connection connection = createConnection(); 190 connection.setClientID(idGen.generateId()); 191 connection.start(); 192 Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 193 MessageConsumer consumer = consumerSession.createConsumer(destination); 194 Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 195 MessageProducer producer = producerSession.createProducer(destination); 196 producer.setDeliveryMode(deliveryMode); 197 198 TextMessage sentMsg = producerSession.createTextMessage(); 199 sentMsg.setText("msg1"); 200 producer.send(sentMsg); 201 producerSession.commit(); 202 203 Message recMsg = consumer.receive(RECEIVE_TIMEOUT); 204 consumerSession.commit(); 205 assertTrue(recMsg.equals(sentMsg)); 206 207 sentMsg = producerSession.createTextMessage(); 208 sentMsg.setText("msg2"); 209 producer.send(sentMsg); 210 producerSession.rollback(); 211 212 sentMsg = producerSession.createTextMessage(); 213 sentMsg.setText("msg3"); 214 producer.send(sentMsg); 215 producerSession.commit(); 216 217 recMsg = consumer.receive(RECEIVE_TIMEOUT); 218 assertTrue(recMsg.equals(sentMsg)); 219 consumerSession.commit(); 220 221 connection.close(); 222 } 223 224 } 225 | Popular Tags |