1 18 package org.apache.activemq; 19 20 import javax.jms.Connection ; 21 import javax.jms.JMSException ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageProducer ; 25 import javax.jms.Queue ; 26 import javax.jms.Session ; 27 28 31 public class JmsClientAckTest extends TestSupport { 32 33 private Connection connection; 34 35 protected void setUp() throws Exception { 36 super.setUp(); 37 connection = createConnection(); 38 } 39 40 43 protected void tearDown() throws Exception { 44 if (connection != null) { 45 connection.close(); 46 connection = null; 47 } 48 super.tearDown(); 49 } 50 51 56 public void testAckedMessageAreConsumed() throws JMSException { 57 connection.start(); 58 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 59 Queue queue = session.createQueue(getQueueName()); 60 MessageProducer producer = session.createProducer(queue); 61 producer.send(session.createTextMessage("Hello")); 62 63 MessageConsumer consumer = session.createConsumer(queue); 65 Message msg = consumer.receive(1000); 66 assertNotNull(msg); 67 msg.acknowledge(); 68 69 session.close(); 71 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 72 73 consumer = session.createConsumer(queue); 75 msg = consumer.receive(1000); 76 assertNull(msg); 77 78 session.close(); 79 } 80 81 86 public void testLastMessageAcked() throws JMSException { 87 connection.start(); 88 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 89 Queue queue = session.createQueue(getQueueName()); 90 MessageProducer producer = session.createProducer(queue); 91 producer.send(session.createTextMessage("Hello")); 92 producer.send(session.createTextMessage("Hello2")); 93 producer.send(session.createTextMessage("Hello3")); 94 95 MessageConsumer consumer = session.createConsumer(queue); 97 Message msg = consumer.receive(1000); 98 assertNotNull(msg); 99 msg = consumer.receive(1000); 100 assertNotNull(msg); 101 msg = consumer.receive(1000); 102 assertNotNull(msg); 103 msg.acknowledge(); 104 105 session.close(); 107 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 108 109 consumer = session.createConsumer(queue); 111 msg = consumer.receive(1000); 112 assertNull(msg); 113 114 session.close(); 115 } 116 117 122 public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException { 123 connection.start(); 124 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 125 Queue queue = session.createQueue(getQueueName()); 126 MessageProducer producer = session.createProducer(queue); 127 producer.send(session.createTextMessage("Hello")); 128 129 MessageConsumer consumer = session.createConsumer(queue); 131 Message msg = consumer.receive(1000); 132 assertNotNull(msg); 133 135 session.close(); 137 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 138 139 consumer = session.createConsumer(queue); 141 msg = consumer.receive(2000); 142 assertNotNull(msg); 143 msg.acknowledge(); 144 145 session.close(); 146 } 147 148 protected String getQueueName() { 149 return getClass().getName() + "." + getName(); 150 } 151 152 } 153 | Popular Tags |