1 18 package org.apache.activemq.usecases; 19 20 import java.util.ArrayList ; 21 import java.util.List ; 22 23 import javax.jms.Connection ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 33 import junit.framework.TestCase; 34 35 import org.apache.activemq.ActiveMQConnectionFactory; 36 import org.apache.activemq.command.ActiveMQQueue; 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 40 import java.util.concurrent.CountDownLatch ; 41 42 43 49 public final class TransactionRollbackOrderTest extends TestCase { 50 private static final Log log = LogFactory.getLog(TransactionRollbackOrderTest.class); 51 52 private volatile String receivedText; 53 54 private Session producerSession; 55 private Session consumerSession; 56 private Destination queue; 57 58 private MessageProducer producer; 59 private MessageConsumer consumer; 60 private Connection connection; 61 private CountDownLatch latch = new CountDownLatch (1); 62 private int NUM_MESSAGES = 5; 63 private List msgSent = new ArrayList (); 64 private List msgCommitted = new ArrayList (); 65 private List msgRolledBack = new ArrayList (); 66 private List msgRedelivered = new ArrayList (); 67 68 public void testTransaction() throws Exception { 69 70 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 71 72 connection = factory.createConnection(); 73 queue = new ActiveMQQueue(getClass().getName() + "." + getName()); 74 75 producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 76 consumerSession = connection.createSession(true, 0); 77 78 producer = producerSession.createProducer(queue); 79 80 consumer = consumerSession.createConsumer(queue); 81 consumer.setMessageListener(new MessageListener () { 82 83 int msgCount = 0; 84 int msgCommittedCount = 0; 85 86 public void onMessage(Message m) { 87 try { 88 msgCount++; 89 TextMessage tm = (TextMessage ) m; 90 receivedText = tm.getText(); 91 92 if (tm.getJMSRedelivered()) { 93 msgRedelivered.add(receivedText); 94 } 95 96 log.info("consumer received message: " + receivedText + (tm.getJMSRedelivered() ? " ** Redelivered **" : "")); 97 if (msgCount == 3) { 98 msgRolledBack.add(receivedText); 99 consumerSession.rollback(); 100 log.info("[msg: " + receivedText + "] ** rolled back **"); 101 } 102 else { 103 msgCommittedCount++; 104 msgCommitted.add(receivedText); 105 consumerSession.commit(); 106 log.info("[msg: " + receivedText + "] committed transaction "); 107 } 108 if (msgCommittedCount == NUM_MESSAGES) { 109 latch.countDown(); 110 } 111 } 112 catch (JMSException e) { 113 try { 114 consumerSession.rollback(); 115 log.info("rolled back transaction"); 116 } 117 catch (JMSException e1) { 118 log.info(e1); 119 e1.printStackTrace(); 120 } 121 log.info(e); 122 e.printStackTrace(); 123 } 124 } 125 }); 126 connection.start(); 127 128 TextMessage tm = null; 129 try { 130 for (int i = 1; i <= NUM_MESSAGES; i++) { 131 tm = producerSession.createTextMessage(); 132 tm.setText("Hello " + i); 133 msgSent.add(tm.getText()); 134 producer.send(tm); 135 log.info("producer sent message: " + tm.getText()); 136 } 137 } 138 catch (JMSException e) { 139 e.printStackTrace(); 140 } 141 142 log.info("Waiting for latch"); 143 latch.await(); 144 145 assertEquals(1, msgRolledBack.size()); 146 assertEquals(1, msgRedelivered.size()); 147 148 log.info("msg RolledBack = " + msgRolledBack.get(0)); 149 log.info("msg Redelivered = " + msgRedelivered.get(0)); 150 151 assertEquals(msgRolledBack.get(0), msgRedelivered.get(0)); 152 153 assertEquals(NUM_MESSAGES, msgSent.size()); 154 assertEquals(NUM_MESSAGES, msgCommitted.size()); 155 156 assertEquals(msgSent, msgCommitted); 157 158 } 159 160 protected void tearDown() throws Exception { 161 if (connection != null) { 162 log.info("Closing the connection"); 163 connection.close(); 164 } 165 super.tearDown(); 166 } 167 } 168 | Popular Tags |