1 18 package org.apache.activemq.test.rollback; 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 23 import javax.jms.Connection ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.Session ; 30 31 32 public class DelegatingTransactionalMessageListener implements MessageListener { 33 private static final transient Log log = LogFactory.getLog(DelegatingTransactionalMessageListener.class); 34 35 private final MessageListener underlyingListener; 36 private boolean transacted = true; 37 private int ackMode = Session.AUTO_ACKNOWLEDGE; 38 private Session session; 39 40 public DelegatingTransactionalMessageListener(MessageListener underlyingListener, Connection connection, Destination destination) { 41 this.underlyingListener = underlyingListener; 42 43 try { 44 session = connection.createSession(transacted, ackMode); 45 MessageConsumer consumer = session.createConsumer(destination); 46 consumer.setMessageListener(this); 47 } 48 catch (JMSException e) { 49 throw new IllegalStateException ("Could not listen to " + destination, e); 50 } 51 } 52 53 public void onMessage(Message message) { 54 try { 55 underlyingListener.onMessage(message); 56 session.commit(); 57 } 58 catch (Throwable e) { 59 rollback(); 60 } 61 } 62 63 private void rollback() { 64 try { 65 session.rollback(); 66 } 67 catch (JMSException e) { 68 log.error("Failed to rollback: " + e, e); 69 } 70 } 71 72 public Session getSession() { 73 return session; 74 } 75 } 76 | Popular Tags |