1 18 package org.apache.activemq; 19 20 import javax.jms.Connection ; 21 import javax.jms.Message ; 22 import javax.jms.MessageConsumer ; 23 import javax.jms.MessageListener ; 24 import javax.jms.MessageProducer ; 25 import javax.jms.Queue ; 26 import javax.jms.Session ; 27 28 31 public class JmsClientAckListenerTest extends TestSupport implements MessageListener { 32 33 private Connection connection; 34 private boolean dontAck=false; 35 36 protected void setUp() throws Exception { 37 super.setUp(); 38 connection = createConnection(); 39 } 40 41 44 protected void tearDown() throws Exception { 45 if (connection != null) { 46 connection.close(); 47 connection = null; 48 } 49 super.tearDown(); 50 } 51 52 57 public void testAckedMessageAreConsumed() throws Exception { 58 connection.start(); 59 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 60 Queue queue = session.createQueue("test"); 61 MessageProducer producer = session.createProducer(queue); 62 producer.send(session.createTextMessage("Hello")); 63 64 MessageConsumer consumer = session.createConsumer(queue); 66 consumer.setMessageListener(this); 67 68 Thread.sleep(10000); 69 70 session.close(); 72 73 74 75 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 76 77 consumer = session.createConsumer(queue); 79 Message msg = consumer.receive(1000); 80 assertNull(msg); 81 82 session.close(); 83 } 84 85 90 public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception { 91 connection.start(); 92 dontAck=true; 94 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 95 Queue queue = session.createQueue("test"); 96 MessageProducer producer = session.createProducer(queue); 97 producer.send(session.createTextMessage("Hello")); 98 99 MessageConsumer consumer = session.createConsumer(queue); 101 consumer.setMessageListener(this); 102 104 session.close(); 106 107 Thread.sleep(10000); 108 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 109 consumer = session.createConsumer(queue); 111 Message msg = consumer.receive(2000); 112 assertNotNull(msg); 113 msg.acknowledge(); 114 115 session.close(); 116 } 117 118 119 public void onMessage(Message message){ 120 121 assertNotNull(message); 122 if(!dontAck) { 123 try { 124 message.acknowledge(); 125 }catch(Exception e){ 126 e.printStackTrace(); 127 } 128 129 } 130 131 } 132 } 133 | Popular Tags |