1 18 package org.apache.activemq.test.rollback; 19 20 import javax.jms.Connection ; 21 import javax.jms.JMSException ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageListener ; 25 import javax.jms.Session ; 26 import javax.jms.TextMessage ; 27 28 import org.apache.activemq.EmbeddedBrokerTestSupport; 29 import org.springframework.jms.core.MessageCreator; 30 31 import java.util.concurrent.CountDownLatch ; 32 import java.util.concurrent.TimeUnit ; 33 import java.util.concurrent.atomic.AtomicInteger ; 34 35 38 public class RollbacksWhileConsumingLargeQueueTest extends 39 EmbeddedBrokerTestSupport implements MessageListener { 40 41 protected int numberOfMessagesOnQueue = 6500; 42 private Connection connection; 43 private AtomicInteger deliveryCounter = new AtomicInteger (0); 44 private AtomicInteger ackCounter = new AtomicInteger (0); 45 private CountDownLatch latch; 46 private Throwable failure; 47 48 public void testWithReciever() throws Throwable { 49 latch = new CountDownLatch (numberOfMessagesOnQueue); 50 Session session = connection.createSession(true, 0); 51 MessageConsumer consumer = session.createConsumer(destination); 52 53 long start = System.currentTimeMillis(); 54 while ((System.currentTimeMillis() - start) < 1000*1000) { 55 if (getFailure() != null) { 56 throw getFailure(); 57 } 58 59 if( ackCounter.get() == numberOfMessagesOnQueue ) 61 return; 62 63 Message message = consumer.receive(1000); 64 if (message == null) 65 continue; 66 67 try { 68 onMessage(message); 69 session.commit(); 70 } catch (Throwable e) { 71 session.rollback(); 72 } 73 } 74 75 fail("Did not receive all the messages."); 76 } 77 78 public void testWithMessageListener() throws Throwable { 79 latch = new CountDownLatch (numberOfMessagesOnQueue); 80 new DelegatingTransactionalMessageListener(this, connection, 81 destination); 82 83 long start = System.currentTimeMillis(); 84 while ((System.currentTimeMillis() - start) < 1000*1000) { 85 86 if (getFailure() != null) { 87 throw getFailure(); 88 } 89 90 if (latch.await(1, TimeUnit.SECONDS)) { 91 System.out.println("Received: " + deliveryCounter.get() 92 + " message(s)"); 93 return; 94 } 95 96 } 97 98 fail("Did not receive all the messages."); 99 } 100 101 102 protected void setUp() throws Exception { 103 super.setUp(); 104 105 connection = createConnection(); 106 connection.start(); 107 108 for (int i = 0; i < numberOfMessagesOnQueue; i++) { 110 template.send(createMessageCreator(i)); 111 } 112 113 } 114 115 protected void tearDown() throws Exception { 116 if (connection != null) { 117 connection.close(); 118 } 119 super.tearDown(); 120 } 121 122 protected MessageCreator createMessageCreator(final int i) { 123 return new MessageCreator() { 124 public Message createMessage(Session session) throws JMSException { 125 TextMessage answer = session.createTextMessage("Message: " + i); 126 answer.setIntProperty("Counter", i); 127 return answer; 128 } 129 }; 130 } 131 132 public void onMessage(Message message) { 133 String msgId = null; 134 String msgText = null; 135 136 try { 137 msgId = message.getJMSMessageID(); 138 msgText = ((TextMessage ) message).getText(); 139 } catch (JMSException e) { 140 setFailure(e); 141 } 142 143 try { 144 assertEquals("Message: " + ackCounter.get(), msgText); 145 } catch (Throwable e) { 146 setFailure(e); 147 } 148 149 int value = deliveryCounter.incrementAndGet(); 150 if (value % 2 == 0) { 151 log.info("Rolling Back message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText); 152 throw new RuntimeException ("Dummy exception on message: " + value); 153 } 154 155 log.info("Received message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText); 156 ackCounter.incrementAndGet(); 157 latch.countDown(); 158 } 159 160 public synchronized Throwable getFailure() { 161 return failure; 162 } 163 164 public synchronized void setFailure(Throwable failure) { 165 this.failure = failure; 166 } 167 } 168 | Popular Tags |