1 17 package org.apache.activemq; 18 19 import org.apache.activemq.command.ActiveMQQueue; 20 import org.apache.activemq.spring.SpringConsumer; 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 24 import javax.jms.Connection ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Queue ; 31 import javax.jms.Session ; 32 import javax.jms.TextMessage ; 33 34 38 public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { 39 40 private static final Log log = LogFactory.getLog(ZeroPrefetchConsumerTest.class); 41 42 protected Connection connection; 43 protected Queue queue; 44 45 public void testCannotUseMessageListener() throws Exception { 46 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 47 MessageConsumer consumer = session.createConsumer(queue); 48 49 MessageListener listener = new SpringConsumer(); 50 try { 51 consumer.setMessageListener(listener); 52 fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch"); 53 } 54 catch (JMSException e) { 55 log.info("Received expected exception : " + e); 56 } 57 } 58 59 public void testPullConsumerWorks() throws Exception { 60 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 61 62 MessageProducer producer = session.createProducer(queue); 63 producer.send(session.createTextMessage("Hello World!")); 64 65 MessageConsumer consumer = session.createConsumer(queue); 67 Message answer = consumer.receive(5000); 68 assertNotNull("Should have received a message!", answer); 69 answer = consumer.receive(1); 71 assertNull("Should have not received a message!", answer); 72 answer = consumer.receiveNoWait(); 73 assertNull("Should have not received a message!", answer); 74 } 75 76 public void testIdleConsumer() throws Exception { 77 doTestIdleConsumer(false); 78 } 79 80 public void testIdleConsumerTranscated() throws Exception { 81 doTestIdleConsumer(true); 82 } 83 84 private void doTestIdleConsumer(boolean transacted) throws Exception { 85 Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 86 87 MessageProducer producer = session.createProducer(queue); 88 producer.send(session.createTextMessage("Msg1")); 89 producer.send(session.createTextMessage("Msg2")); 90 if(transacted) { 91 session.commit(); 92 } 93 MessageConsumer consumer = session.createConsumer(queue); 95 MessageConsumer idleConsumer = session.createConsumer(queue); 97 TextMessage answer = (TextMessage ) consumer.receive(5000); 98 assertEquals("Should have received a message!", answer.getText(), "Msg1"); 99 if(transacted) { 100 session.commit(); 101 } 102 answer = (TextMessage ) consumer.receive(5000); 104 assertEquals("Should have received a message!", answer.getText(), "Msg2"); 105 if(transacted) { 106 session.commit(); 107 } 108 answer = (TextMessage ) consumer.receiveNoWait(); 109 assertNull("Should have not received a message!", answer); 110 } 111 112 public void testRecvRecvCommit() throws Exception { 113 doTestRecvRecvCommit(false); 114 } 115 116 public void testRecvRecvCommitTranscated() throws Exception { 117 doTestRecvRecvCommit(true); 118 } 119 120 private void doTestRecvRecvCommit(boolean transacted) throws Exception { 121 Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 122 123 MessageProducer producer = session.createProducer(queue); 124 producer.send(session.createTextMessage("Msg1")); 125 producer.send(session.createTextMessage("Msg2")); 126 if(transacted) { 127 session.commit(); 128 } 129 MessageConsumer consumer = session.createConsumer(queue); 131 TextMessage answer = (TextMessage ) consumer.receiveNoWait(); 132 assertEquals("Should have received a message!", answer.getText(), "Msg1"); 133 answer = (TextMessage ) consumer.receiveNoWait(); 134 assertEquals("Should have received a message!", answer.getText(), "Msg2"); 135 if(transacted) { 136 session.commit(); 137 } 138 answer = (TextMessage ) consumer.receiveNoWait(); 139 assertNull("Should have not received a message!", answer); 140 } 141 142 protected void setUp() throws Exception { 143 bindAddress = "tcp://localhost:61616"; 144 super.setUp(); 145 146 connection = createConnection(); 147 connection.start(); 148 queue = createQueue(); 149 } 150 151 protected void tearDown() throws Exception { 152 connection.close(); 153 super.tearDown(); 154 } 155 156 protected Queue createQueue() { 157 return new ActiveMQQueue(getDestinationString() + "?consumer.prefetchSize=0"); 158 } 159 160 } 161 | Popular Tags |