1 18 package org.apache.activemq; 19 20 import javax.jms.Connection ; 21 import javax.jms.DeliveryMode ; 22 import javax.jms.Destination ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageListener ; 27 import javax.jms.MessageProducer ; 28 import javax.jms.Session ; 29 import javax.jms.TextMessage ; 30 31 import org.apache.activemq.ActiveMQConnectionFactory; 32 import org.apache.activemq.command.ActiveMQQueue; 33 import org.apache.activemq.command.ActiveMQTopic; 34 35 import junit.framework.TestCase; 36 import java.util.concurrent.CountDownLatch ; 37 import java.util.concurrent.TimeUnit ; 38 39 43 public class JmsSessionRecoverTest extends TestCase { 44 45 private Connection connection; 46 private ActiveMQConnectionFactory factory; 47 private Destination dest; 48 49 52 protected void setUp() throws Exception { 53 factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 54 connection = factory.createConnection(); 55 } 56 57 60 protected void tearDown() throws Exception { 61 if (connection != null) { 62 connection.close(); 63 connection = null; 64 } 65 } 66 67 72 public void testQueueSynchRecover() throws JMSException , InterruptedException { 73 dest = new ActiveMQQueue("Queue-"+System.currentTimeMillis()); 74 doTestSynchRecover(); 75 } 76 77 82 public void testQueueAsynchRecover() throws JMSException , InterruptedException { 83 dest = new ActiveMQQueue("Queue-"+System.currentTimeMillis()); 84 doTestAsynchRecover(); 85 } 86 87 92 public void testTopicSynchRecover() throws JMSException , InterruptedException { 93 dest = new ActiveMQTopic("Topic-"+System.currentTimeMillis()); 94 doTestSynchRecover(); 95 } 96 97 102 public void testTopicAsynchRecover() throws JMSException , InterruptedException { 103 dest = new ActiveMQTopic("Topic-"+System.currentTimeMillis()); 104 doTestAsynchRecover(); 105 } 106 107 112 public void testQueueAsynchRecoverWithAutoAck() throws JMSException , InterruptedException { 113 dest = new ActiveMQQueue("Queue-"+System.currentTimeMillis()); 114 doTestAsynchRecoverWithAutoAck(); 115 } 116 117 122 public void testTopicAsynchRecoverWithAutoAck() throws JMSException , InterruptedException { 123 dest = new ActiveMQTopic("Topic-"+System.currentTimeMillis()); 124 doTestAsynchRecoverWithAutoAck(); 125 } 126 127 132 public void doTestSynchRecover() throws JMSException { 133 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 134 MessageConsumer consumer = session.createConsumer(dest); 135 connection.start(); 136 137 MessageProducer producer = session.createProducer(dest); 138 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 139 producer.send(session.createTextMessage("First")); 140 producer.send(session.createTextMessage("Second")); 141 142 TextMessage message = (TextMessage )consumer.receive(1000); 143 assertEquals("First", message.getText()); 144 assertFalse(message.getJMSRedelivered()); 145 message.acknowledge(); 146 147 message = (TextMessage )consumer.receive(1000); 148 assertEquals("Second", message.getText()); 149 assertFalse(message.getJMSRedelivered()); 150 151 session.recover(); 152 153 message = (TextMessage )consumer.receive(2000); 154 assertEquals("Second", message.getText()); 155 assertTrue(message.getJMSRedelivered()); 156 157 message.acknowledge(); 158 } 159 160 166 public void doTestAsynchRecover() throws JMSException , InterruptedException { 167 168 final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 169 final String errorMessage[] = new String []{null}; 170 final CountDownLatch doneCountDownLatch = new CountDownLatch (1); 171 172 MessageConsumer consumer = session.createConsumer(dest); 173 174 MessageProducer producer = session.createProducer(dest); 175 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 176 producer.send(session.createTextMessage("First")); 177 producer.send(session.createTextMessage("Second")); 178 179 consumer.setMessageListener(new MessageListener (){ 180 int counter; 181 public void onMessage(Message msg) { 182 counter++; 183 try { 184 TextMessage message = (TextMessage )msg; 185 switch( counter ) { 186 case 1: 187 assertEquals("First", message.getText()); 188 assertFalse(message.getJMSRedelivered()); 189 message.acknowledge(); 190 191 break; 192 case 2: 193 assertEquals("Second", message.getText()); 194 assertFalse(message.getJMSRedelivered()); 195 session.recover(); 196 break; 197 198 case 3: 199 assertEquals("Second", message.getText()); 200 assertTrue(message.getJMSRedelivered()); 201 message.acknowledge(); 202 doneCountDownLatch.countDown(); 203 break; 204 205 default: 206 errorMessage[0]="Got too many messages: "+counter; 207 doneCountDownLatch.countDown(); 208 } 209 } catch (Throwable e) { 210 e.printStackTrace(); 211 errorMessage[0]="Got exception: "+e; 212 doneCountDownLatch.countDown(); 213 } 214 } 215 }); 216 connection.start(); 217 218 if( doneCountDownLatch.await(5, TimeUnit.SECONDS) ) { 219 if( errorMessage[0]!=null ) { 220 fail(errorMessage[0]); 221 } 222 } else { 223 fail("Timeout waiting for async message delivery to complete."); 224 } 225 226 } 227 228 234 public void doTestAsynchRecoverWithAutoAck() throws JMSException , InterruptedException { 235 236 final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 237 final String errorMessage[] = new String []{null}; 238 final CountDownLatch doneCountDownLatch = new CountDownLatch (1); 239 240 MessageConsumer consumer = session.createConsumer(dest); 241 242 MessageProducer producer = session.createProducer(dest); 243 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 244 producer.send(session.createTextMessage("First")); 245 producer.send(session.createTextMessage("Second")); 246 247 consumer.setMessageListener(new MessageListener (){ 248 int counter; 249 public void onMessage(Message msg) { 250 counter++; 251 try { 252 TextMessage message = (TextMessage )msg; 253 switch( counter ) { 254 case 1: 255 assertEquals("First", message.getText()); 256 assertFalse(message.getJMSRedelivered()); 257 break; 258 case 2: 259 assertEquals("Second", message.getText()); 261 assertFalse(message.getJMSRedelivered()); 262 session.recover(); 263 break; 264 265 case 3: 266 assertEquals("Second", message.getText()); 267 assertTrue(message.getJMSRedelivered()); 268 doneCountDownLatch.countDown(); 269 break; 270 271 default: 272 errorMessage[0]="Got too many messages: "+counter; 273 doneCountDownLatch.countDown(); 274 } 275 } catch (Throwable e) { 276 e.printStackTrace(); 277 errorMessage[0]="Got exception: "+e; 278 doneCountDownLatch.countDown(); 279 } 280 } 281 }); 282 connection.start(); 283 284 if( doneCountDownLatch.await(5000, TimeUnit.SECONDS) ) { 285 if( errorMessage[0]!=null ) { 286 fail(errorMessage[0]); 287 } 288 } else { 289 fail("Timeout waiting for async message delivery to complete."); 290 } 291 } 292 } 293 | Popular Tags |