1 18 package org.apache.activemq; 19 20 import javax.jms.DeliveryMode ; 21 import javax.jms.Message ; 22 import javax.jms.MessageConsumer ; 23 import javax.jms.MessageProducer ; 24 import javax.jms.Session ; 25 26 import junit.framework.Test; 27 28 import org.apache.activemq.command.ActiveMQQueue; 29 30 35 public class JMSExclusiveConsumerTest extends JmsTestSupport { 36 37 public static Test suite() { 38 return suite(JMSExclusiveConsumerTest.class); 39 } 40 41 public static void main(String [] args) { 42 junit.textui.TestRunner.run(suite()); 43 } 44 45 public int deliveryMode; 46 47 public void initCombosForTestRoundRobinDispatchOnNonExclusive() { 48 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 49 new Integer (DeliveryMode.PERSISTENT) }); 50 } 51 52 57 public void testRoundRobinDispatchOnNonExclusive() throws Exception { 58 59 connection.start(); 61 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 62 ActiveMQQueue destination = new ActiveMQQueue("TEST"); 63 MessageProducer producer = session.createProducer(destination); 64 producer.setDeliveryMode(deliveryMode); 65 66 MessageConsumer consumer1 = session.createConsumer(destination); 67 MessageConsumer consumer2 = session.createConsumer(destination); 68 69 producer.send(session.createTextMessage("1st")); 71 producer.send(session.createTextMessage("2nd")); 72 73 Message m; 74 m = consumer2.receive(1000); 75 assertNotNull(m); 76 77 m = consumer1.receive(1000); 78 assertNotNull(m); 79 80 assertNull(consumer1.receiveNoWait()); 81 assertNull(consumer2.receiveNoWait()); 82 } 83 84 public void initCombosForTestDispatchExclusive() { 85 addCombinationValues("deliveryMode", new Object [] { new Integer (DeliveryMode.NON_PERSISTENT), 86 new Integer (DeliveryMode.PERSISTENT) }); 87 } 88 89 95 public void testDispatchExclusive() throws Exception { 96 97 connection.start(); 99 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 100 ActiveMQQueue destination = new ActiveMQQueue("TEST?consumer.exclusive=true"); 101 MessageProducer producer = session.createProducer(destination); 102 producer.setDeliveryMode(deliveryMode); 103 104 MessageConsumer consumer1 = session.createConsumer(destination); 105 MessageConsumer consumer2 = session.createConsumer(destination); 106 107 producer.send(session.createTextMessage("1st")); 109 producer.send(session.createTextMessage("2nd")); 110 producer.send(session.createTextMessage("3nd")); 111 112 Message m; 113 m = consumer2.receive(1000); 114 if( m!=null ) { 115 for (int i = 0; i < 2; i++) { 117 m = consumer2.receive(1000); 118 assertNotNull(m); 119 } 120 } else { 121 for (int i = 0; i < 3; i++) { 123 m = consumer1.receive(1000); 124 assertNotNull(m); 125 } 126 } 127 128 assertNull(consumer1.receiveNoWait()); 129 assertNull(consumer2.receiveNoWait()); 130 } 131 132 public void testMixExclusiveWithNonExclusive() throws Exception { 133 ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=true"); 134 ActiveMQQueue nonExclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=false"); 135 136 connection.start(); 137 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 138 139 MessageConsumer nonExCon = session.createConsumer(nonExclusiveQueue); 140 MessageConsumer exCon = session.createConsumer(exclusiveQueue); 141 142 143 MessageProducer prod = session.createProducer(exclusiveQueue); 144 prod.send(session.createMessage()); 145 prod.send(session.createMessage()); 146 prod.send(session.createMessage()); 147 148 Message m; 149 for (int i=0; i<3; i++) { 150 m = exCon.receive(1000); 151 assertNotNull(m); 152 m = nonExCon.receive(1000); 153 assertNull(m); 154 } 155 } 156 } 157 | Popular Tags |