1 14 15 package org.apache.activemq.broker.region.cursors; 16 17 import javax.jms.Connection ; 18 import javax.jms.ConnectionFactory ; 19 import javax.jms.Destination ; 20 import javax.jms.JMSException ; 21 import javax.jms.MessageConsumer ; 22 import javax.jms.Session ; 23 import org.apache.activemq.broker.BrokerService; 24 import org.apache.activemq.broker.region.policy.PolicyEntry; 25 import org.apache.activemq.broker.region.policy.PolicyMap; 26 import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; 27 import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy; 28 29 32 public class CursorQueueStoreTest extends CursorSupport{ 33 34 protected Destination getDestination(Session session) throws JMSException { 35 String queueName="QUEUE" + getClass().getName(); 36 return session.createQueue(queueName); 37 } 38 39 protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { 40 Connection connection=fac.createConnection(); 41 connection.setClientID("testConsumer"); 42 connection.start(); 43 return connection; 44 } 45 46 protected MessageConsumer getConsumer(Connection connection) throws Exception { 47 Session consumerSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 48 Destination dest = getDestination(consumerSession); 49 MessageConsumer consumer=consumerSession.createConsumer(dest); 50 return consumer; 51 } 52 53 54 protected void configureBroker(BrokerService answer) throws Exception { 55 PolicyEntry policy = new PolicyEntry(); 56 policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy()); 57 PolicyMap pMap = new PolicyMap(); 58 pMap.setDefaultEntry(policy); 59 answer.setDestinationPolicy(pMap); 60 answer.setDeleteAllMessagesOnStartup(true); 61 answer.addConnector(bindAddress); 62 answer.setDeleteAllMessagesOnStartup(true); 63 } 64 } 65 | Popular Tags |