1 18 package org.apache.activemq.usecases; 19 20 import javax.jms.Connection ; 21 import javax.jms.DeliveryMode ; 22 import javax.jms.Destination ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.Session ; 28 import javax.jms.TextMessage ; 29 import javax.jms.Topic ; 30 import javax.jms.TopicSubscriber ; 31 32 import org.apache.activemq.ActiveMQConnectionFactory; 33 import org.apache.activemq.TestSupport; 34 import org.apache.activemq.broker.BrokerService; 35 import org.apache.activemq.store.PersistenceAdapter; 36 37 40 abstract public class DurableSubscriptionTestSupport extends TestSupport { 41 42 private Connection connection; 43 private Session session; 44 private TopicSubscriber consumer; 45 private MessageProducer producer; 46 private BrokerService broker; 47 48 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 49 return new ActiveMQConnectionFactory("vm://durable-broker"); 50 } 51 52 protected Connection createConnection() throws Exception { 53 Connection rc = super.createConnection(); 54 rc.setClientID(getName()); 55 return rc; 56 } 57 58 protected void setUp() throws Exception { 59 createBroker(); 60 super.setUp(); 61 } 62 protected void tearDown() throws Exception { 63 super.tearDown(); 64 destroyBroker(); 65 } 66 protected void restartBroker() throws Exception { 67 destroyBroker(); 68 createRestartedBroker(); } 70 private void createBroker() throws Exception { 71 try { 72 broker = new BrokerService(); 73 broker.setBrokerName("durable-broker"); 74 broker.setDeleteAllMessagesOnStartup(true); 75 broker.setPersistenceAdapter(createPersistenceAdapter()); 76 broker.setPersistent(true); 77 broker.start(); 78 } catch (Exception e) { 79 e.printStackTrace(); 80 } 81 82 connection = createConnection(); 83 } 84 85 private void createRestartedBroker() throws Exception { 86 try { 87 broker = new BrokerService(); 88 broker.setBrokerName("durable-broker"); 89 broker.setDeleteAllMessagesOnStartup(false); 90 broker.setPersistenceAdapter(createPersistenceAdapter()); 91 broker.setPersistent(true); 92 broker.start(); 93 94 } catch (Exception e) { 95 e.printStackTrace(); 96 } 97 98 connection = createConnection(); 99 } 100 private void destroyBroker() throws Exception { 101 if( connection != null ) 102 connection.close(); 103 if( broker!=null ) 104 broker.stop(); 105 } 106 107 abstract protected PersistenceAdapter createPersistenceAdapter() throws Exception ; 108 109 110 public void XtestUnsubscribeSubscription() throws Exception { 111 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 112 Topic topic = session.createTopic("TestTopic"); 113 consumer = session.createDurableSubscriber(topic, "sub1"); 114 producer = session.createProducer(topic); 115 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 116 connection.start(); 117 118 producer.send(session.createTextMessage("Msg:1")); 120 assertTextMessageEquals("Msg:1", consumer.receive(5000)); 121 122 consumer.close(); 124 producer.send(session.createTextMessage("Msg:2")); 126 session.unsubscribe("sub1"); 127 128 connection.close(); 130 connection = createConnection(); 131 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 132 producer = session.createProducer(topic); 133 connection.start(); 134 135 consumer = session.createDurableSubscriber(topic, "sub1"); 137 producer.send(session.createTextMessage("Msg:3")); 138 139 assertTextMessageEquals("Msg:3", consumer.receive(5000)); 141 } 142 143 public void XtestInactiveDurableSubscriptionTwoConnections() throws Exception { 144 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 145 Topic topic = session.createTopic("TestTopic"); 146 consumer = session.createDurableSubscriber(topic, "sub1"); 147 producer = session.createProducer(topic); 148 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 149 connection.start(); 150 151 producer.send(session.createTextMessage("Msg:1")); 153 assertTextMessageEquals("Msg:1", consumer.receive(5000)); 154 155 consumer.close(); 157 158 producer.send(session.createTextMessage("Msg:2")); 160 161 connection.close(); 163 connection = createConnection(); 164 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 165 connection.start(); 166 167 consumer = session.createDurableSubscriber(topic, "sub1"); 169 170 assertTextMessageEquals("Msg:2", consumer.receive(5000)); 172 } 173 174 public void XtestInactiveDurableSubscriptionBrokerRestart() throws Exception { 175 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 176 Topic topic = session.createTopic("TestTopic"); 177 consumer = session.createDurableSubscriber(topic, "sub1"); 178 producer = session.createProducer(topic); 179 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 180 connection.start(); 181 182 producer.send(session.createTextMessage("Msg:1")); 184 assertTextMessageEquals("Msg:1", consumer.receive(5000)); 185 186 consumer.close(); 188 189 producer.send(session.createTextMessage("Msg:2")); 191 192 restartBroker(); 194 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 195 connection.start(); 196 197 consumer = session.createDurableSubscriber(topic, "sub1"); 199 200 assertTextMessageEquals("Msg:2", consumer.receive(5000)); 202 assertNull(consumer.receive(5000)); 203 } 204 205 public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception { 206 207 connection.start(); 209 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 210 211 Topic topic = session.createTopic("TestTopic?consumer.retroactive=true"); 213 consumer = session.createDurableSubscriber(topic, "sub1"); 214 215 restartBroker(); 217 218 connection.start(); 220 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 221 producer = session.createProducer(topic); 222 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 223 224 producer.send(session.createTextMessage("Msg:1")); 226 227 consumer = session.createDurableSubscriber(topic, "sub1"); 229 230 producer.send(session.createTextMessage("Msg:2")); 232 233 234 assertTextMessageEquals("Msg:1", consumer.receive(5000)); 236 assertTextMessageEquals("Msg:2", consumer.receive(5000)); 237 238 assertNull(consumer.receive(5000)); 239 } 240 241 public void XtestInactiveDurableSubscriptionOneConnection() throws Exception { 242 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 243 Topic topic = session.createTopic("TestTopic"); 244 consumer = session.createDurableSubscriber(topic, "sub1"); 245 producer = session.createProducer(topic); 246 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 247 connection.start(); 248 249 producer.send(session.createTextMessage("Msg:1")); 251 assertTextMessageEquals("Msg:1", consumer.receive(5000)); 252 253 consumer.close(); 255 256 producer.send(session.createTextMessage("Msg:2")); 258 259 consumer = session.createDurableSubscriber(topic, "sub1"); 261 262 assertTextMessageEquals("Msg:2", consumer.receive(5000)); 264 } 265 266 public void XtestSelectorChange() throws Exception { 267 session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); 268 Topic topic = session.createTopic("TestTopic"); 269 consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false); 270 producer = session.createProducer(topic); 271 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 272 connection.start(); 273 274 TextMessage msg = session.createTextMessage(); 276 msg.setText("Msg:1"); 277 msg.setStringProperty("color", "blue"); 278 producer.send(msg); 279 msg.setText("Msg:2"); 280 msg.setStringProperty("color", "red"); 281 producer.send(msg); 282 283 assertTextMessageEquals("Msg:2", consumer.receive(5000)); 284 285 consumer.close(); 287 consumer = session.createDurableSubscriber(topic, "sub1", "color='blue'", false); 288 289 msg.setText("Msg:3"); 291 msg.setStringProperty("color", "red"); 292 producer.send(msg); 293 msg.setText("Msg:4"); 294 msg.setStringProperty("color", "blue"); 295 producer.send(msg); 296 297 assertTextMessageEquals("Msg:4", consumer.receive(5000)); 299 } 300 301 302 public void XtestDurableSubWorksInNewSession() throws JMSException { 303 304 connection.start(); 306 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 307 Topic topic = session.createTopic("topic-"+getName()); 308 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); 309 while( consumer.receive(1000)!=null ) 311 ; 312 313 session.close(); 315 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 316 317 MessageProducer producer = createProducer(session, topic); 319 producer.send(session.createTextMessage("Message 1")); 320 321 consumer = session.createDurableSubscriber(topic, "sub1"); 323 Message msg = consumer.receive(1000); 324 assertNotNull(msg); 325 assertEquals( "Message 1", ((TextMessage )msg).getText() ); 326 327 } 328 329 330 public void XtestDurableSubWorksInNewConnection() throws Exception { 331 332 connection.start(); 334 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 335 Topic topic = session.createTopic("topic-"+getName()); 336 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); 337 while( consumer.receive(1000)!=null ) 339 ; 340 341 connection.close(); 345 connection = createConnection(); 346 connection.start(); 347 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 348 349 MessageProducer producer = createProducer(session, topic); 351 producer.send(session.createTextMessage("Message 1")); 352 353 consumer = session.createDurableSubscriber(topic, "sub1"); 355 Message msg = consumer.receive(1000); 356 assertNotNull(msg); 357 assertEquals( "Message 1", ((TextMessage )msg).getText() ); 358 359 } 360 361 private MessageProducer createProducer(Session session, Destination queue) throws JMSException { 362 MessageProducer producer = session.createProducer(queue); 363 producer.setDeliveryMode(getDeliveryMode()); 364 return producer; 365 } 366 367 protected int getDeliveryMode() { 368 return DeliveryMode.PERSISTENT; 369 } 370 private void assertTextMessageEquals(String string, Message message) throws JMSException { 371 assertNotNull("Message was null", message); 372 assertTrue("Message is not a TextMessage", message instanceof TextMessage ); 373 assertEquals(string, ((TextMessage )message).getText()); 374 } 375 376 } 377 | Popular Tags |