1 18 package org.apache.activemq; 19 20 import java.util.ArrayList ; 21 import java.util.Enumeration ; 22 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Queue ; 27 import javax.jms.QueueBrowser ; 28 import javax.jms.Session ; 29 import javax.jms.TextMessage ; 30 31 import org.apache.activemq.test.JmsResourceProvider; 32 33 34 37 public class JmsQueueTransactionTest extends JmsTransactionTestSupport { 38 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 39 .getLog(JmsQueueTransactionTest.class); 40 41 45 protected JmsResourceProvider getJmsResourceProvider() { 46 JmsResourceProvider p = new JmsResourceProvider(); 47 p.setTopic(false); 48 return p; 49 } 50 51 56 public void testReceiveTwoThenCloseConnection() throws Exception { 57 Message[] outbound = new Message[]{ 58 session.createTextMessage("First Message"), 59 session.createTextMessage("Second Message") 60 }; 61 62 while (consumer.receive(1000) != null) { 64 } 65 session.commit(); 66 67 producer.send(outbound[0]); 68 producer.send(outbound[1]); 69 session.commit(); 70 71 log.info("Sent 0: " + outbound[0]); 72 log.info("Sent 1: " + outbound[1]); 73 74 ArrayList messages = new ArrayList (); 75 Message message = consumer.receive(1000); 76 assertEquals(outbound[0], message); 77 78 message = consumer.receive(1000); 79 assertNotNull(message); 80 assertEquals(outbound[1], message); 81 82 reconnect(); 84 85 message = consumer.receive(5000); 88 assertNotNull("Should have re-received the first message again!", message); 89 messages.add(message); 90 assertEquals(outbound[0], message); 91 92 message = consumer.receive(5000); 93 assertNotNull("Should have re-received the second message again!", message); 94 messages.add(message); 95 assertEquals(outbound[1], message); 96 session.commit(); 97 98 Message inbound[] = new Message[messages.size()]; 99 messages.toArray(inbound); 100 101 assertTextMessagesEqual("Rollback did not work", outbound, inbound); 102 } 103 104 109 public void testSendReceiveInSeperateSessionTest() throws Exception { 110 session.close(); 111 int batchCount = 10; 112 113 for (int i=0; i < batchCount; i++) { 114 { 116 Session session = resourceProvider.createSession(connection); 117 MessageProducer producer = resourceProvider.createProducer(session, destination); 118 producer.send(session.createTextMessage("Test Message: "+i)); 120 session.commit(); 121 session.close(); 122 } 123 124 { 126 Session session = resourceProvider.createSession(connection); 127 MessageConsumer consumer = resourceProvider.createConsumer(session, destination); 128 129 TextMessage message = (TextMessage ) consumer.receive(1000*5); 130 assertNotNull("Received only "+i+" messages in batch ", message); 131 assertEquals("Test Message: "+i, message.getText()); 132 133 session.commit(); 134 session.close(); 135 } 136 } 137 } 138 139 145 public void testReceiveBrowseReceive() throws Exception { 146 Message[] outbound = new Message[] { session.createTextMessage("First Message"), 147 session.createTextMessage("Second Message"), 148 session.createTextMessage("Third Message") }; 149 150 while (consumer.receive(1000) != null) { 152 } 153 session.commit(); 154 155 producer.send(outbound[0]); 156 producer.send(outbound[1]); 157 producer.send(outbound[2]); 158 session.commit(); 159 160 assertEquals(outbound[0], consumer.receive(1000)); 162 consumer.close(); 163 164 QueueBrowser browser = session.createBrowser((Queue ) destination); 165 Enumeration enumeration = browser.getEnumeration(); 166 167 assertTrue("should have received the second message", enumeration.hasMoreElements()); 169 assertEquals(outbound[1], (Message) enumeration.nextElement()); 170 171 assertTrue("Should have received the third message", enumeration.hasMoreElements()); 173 assertEquals(outbound[2], (Message) enumeration.nextElement()); 174 175 boolean tooMany = false; 177 while (enumeration.hasMoreElements()) { 178 log.info("Got extra message: " + ((TextMessage ) enumeration.nextElement()).getText()); 179 tooMany = true; 180 } 181 assertFalse(tooMany); 182 browser.close(); 183 184 consumer = resourceProvider.createConsumer(session, destination); 186 assertEquals(outbound[1], consumer.receive(1000)); 188 assertEquals(outbound[2], consumer.receive(1000)); 190 consumer.close(); 191 192 session.commit(); 193 } 194 195 } 196 | Popular Tags |