1 18 package org.apache.activemq.usecases; 19 20 import java.util.Date ; 21 22 import javax.jms.Connection ; 23 import javax.jms.Destination ; 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageConsumer ; 27 import javax.jms.MessageListener ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 import javax.jms.TextMessage ; 31 32 import junit.framework.TestCase; 33 34 import org.apache.activemq.ActiveMQConnectionFactory; 35 import org.apache.activemq.command.ActiveMQQueue; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 39 import java.util.concurrent.CountDownLatch ; 40 41 45 public final class QueueRepeaterTest extends TestCase { 46 47 private static final Log log = LogFactory.getLog(QueueRepeaterTest.class); 48 49 private volatile String receivedText; 50 51 private Session producerSession; 52 private Session consumerSession; 53 private Destination queue; 54 55 private MessageProducer producer; 56 private MessageConsumer consumer; 57 private Connection connection; 58 private CountDownLatch latch = new CountDownLatch (1); 59 60 public void testTransaction() throws Exception { 61 62 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); 63 connection = factory.createConnection(); 64 queue = new ActiveMQQueue(getClass().getName() + "." + getName()); 65 66 producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 67 consumerSession = connection.createSession(true, 0); 68 69 producer = producerSession.createProducer(queue); 70 71 consumer = consumerSession.createConsumer(queue); 72 consumer.setMessageListener(new MessageListener () { 73 74 public void onMessage(Message m) { 75 try { 76 TextMessage tm = (TextMessage ) m; 77 receivedText = tm.getText(); 78 latch.countDown(); 79 80 log.info("consumer received message :" + receivedText); 81 consumerSession.commit(); 82 log.info("committed transaction"); 83 } 84 catch (JMSException e) { 85 try { 86 consumerSession.rollback(); 87 log.info("rolled back transaction"); 88 } 89 catch (JMSException e1) { 90 log.info(e1); 91 e1.printStackTrace(); 92 } 93 log.info(e); 94 e.printStackTrace(); 95 } 96 } 97 }); 98 99 connection.start(); 100 101 TextMessage tm = null; 102 try { 103 tm = producerSession.createTextMessage(); 104 tm.setText("Hello, " + new Date ()); 105 producer.send(tm); 106 log.info("producer sent message :" + tm.getText()); 107 } 108 catch (JMSException e) { 109 e.printStackTrace(); 110 } 111 112 log.info("Waiting for latch"); 113 latch.await(); 114 115 log.info("test completed, destination=" + receivedText); 116 } 117 118 protected void tearDown() throws Exception { 119 if (connection != null) { 120 connection.close(); 121 } 122 super.tearDown(); 123 } 124 } 125 | Popular Tags |