1 18 19 package org.apache.activemq; 20 21 import javax.jms.Connection ; 22 import javax.jms.DeliveryMode ; 23 import javax.jms.Destination ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.MessageProducer ; 28 import javax.jms.Session ; 29 import javax.jms.TextMessage ; 30 import javax.jms.Topic ; 31 32 35 public class JmsTopicSelectorTest extends TestSupport { 36 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 37 .getLog(JmsTopicSelectorTest.class); 38 39 protected Connection connection; 40 protected Session session; 41 protected MessageConsumer consumer; 42 protected MessageProducer producer; 43 protected Destination consumerDestination; 44 protected Destination producerDestination; 45 protected boolean topic = true; 46 protected boolean durable = false; 47 protected int deliveryMode = DeliveryMode.PERSISTENT; 48 49 public JmsTopicSelectorTest() { 50 super(); 51 } 52 53 public JmsTopicSelectorTest(String name) { 54 super(name); 55 } 56 57 public void setUp() throws Exception { 58 super.setUp(); 59 60 connectionFactory = createConnectionFactory(); 61 connection = createConnection(); 62 if (durable) { 63 connection.setClientID(getClass().getName()); 64 } 65 66 log.info("Created connection: " + connection); 67 68 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 69 70 log.info("Created session: " + session); 71 72 if (topic) { 73 consumerDestination = session.createTopic(getConsumerSubject()); 74 producerDestination = session.createTopic(getProducerSubject()); 75 } else { 76 consumerDestination = session.createQueue(getConsumerSubject()); 77 producerDestination = session.createQueue(getProducerSubject()); 78 } 79 80 log.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); 81 log.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); 82 producer = session.createProducer(producerDestination); 83 producer.setDeliveryMode(deliveryMode); 84 85 log.info("Created producer: " + producer + " delivery mode = " + 86 (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT")); 87 connection.start(); 88 } 89 90 public void tearDown() throws Exception { 91 session.close(); 92 connection.close(); 93 } 94 95 protected MessageConsumer createConsumer(String selector) throws JMSException { 96 if (durable) { 97 log.info("Creating durable consumer"); 98 return session.createDurableSubscriber((Topic ) consumerDestination, getName(), selector, false); 99 } 100 return session.createConsumer(consumerDestination, selector); 101 } 102 103 public void sendMessages() throws Exception { 104 TextMessage message = session.createTextMessage("1"); 105 message.setIntProperty("id", 1); 106 message.setJMSType("a"); 107 message.setStringProperty("stringProperty", "a"); 108 message.setLongProperty("longProperty", 1); 109 message.setBooleanProperty("booleanProperty", true); 110 producer.send(message); 111 112 message = session.createTextMessage("2"); 113 message.setIntProperty("id", 2); 114 message.setJMSType("a"); 115 message.setStringProperty("stringProperty", "a"); 116 message.setLongProperty("longProperty", 1); 117 message.setBooleanProperty("booleanProperty", false); 118 producer.send(message); 119 120 message = session.createTextMessage("3"); 121 message.setIntProperty("id", 3); 122 message.setJMSType("a"); 123 message.setStringProperty("stringProperty", "a"); 124 message.setLongProperty("longProperty", 1); 125 message.setBooleanProperty("booleanProperty", true); 126 producer.send(message); 127 128 message = session.createTextMessage("4"); 129 message.setIntProperty("id", 4); 130 message.setJMSType("b"); 131 message.setStringProperty("stringProperty", "b"); 132 message.setLongProperty("longProperty", 2); 133 message.setBooleanProperty("booleanProperty", false); 134 producer.send(message); 135 136 message = session.createTextMessage("5"); 137 message.setIntProperty("id", 5); 138 message.setJMSType("c"); 139 message.setStringProperty("stringProperty", "c"); 140 message.setLongProperty("longProperty", 3); 141 message.setBooleanProperty("booleanProperty", true); 142 producer.send(message); 143 } 144 145 public void consumeMessages(int remaining) throws Exception { 146 consumer = createConsumer(null); 147 for (int i = 0; i < remaining; i++) { 148 consumer.receive(1000); 149 } 150 consumer.close(); 151 152 } 153 154 public void testPropertySelector() throws Exception { 155 int remaining = 5; 156 Message message = null; 157 consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true"); 158 sendMessages(); 159 while (true) { 160 message = consumer.receive(1000); 161 if (message == null) { 162 break; 163 } 164 String text = ((TextMessage ) message).getText(); 165 if (!text.equals("1") && !text.equals("3")) { 166 fail("unexpected message: " + text); 167 } 168 remaining--; 169 } 170 assertEquals(remaining, 3); 171 consumer.close(); 172 consumeMessages(remaining); 173 174 } 175 176 public void testJMSPropertySelector() throws Exception { 177 int remaining = 5; 178 Message message = null; 179 consumer = createConsumer("JMSType = 'a' and stringProperty = 'a'"); 180 sendMessages(); 181 while (true) { 182 message = consumer.receive(1000); 183 if (message == null) { 184 break; 185 } 186 String text = ((TextMessage ) message).getText(); 187 if (!text.equals("1") && !text.equals("2") && !text.equals("3")) { 188 fail("unexpected message: " + text); 189 } 190 remaining--; 191 } 192 assertEquals(remaining, 2); 193 consumer.close(); 194 consumeMessages(remaining); 195 196 } 197 198 } 199 | Popular Tags |