1 18 package org.apache.activemq.transport.failover; 19 20 import java.net.URI ; 21 22 import javax.jms.Connection ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 28 import org.apache.activemq.ActiveMQConnectionFactory; 29 import org.apache.activemq.ActiveMQPrefetchPolicy; 30 import org.apache.activemq.command.ActiveMQQueue; 31 import org.apache.activemq.network.NetworkTestSupport; 32 33 public class FailoverConsumerTest extends NetworkTestSupport { 34 35 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 36 .getLog(FailoverConsumerTest.class); 37 38 public static final int MSG_COUNT = 100; 39 40 public void testPublisherFailsOver() throws Exception { 41 URI failoverURI = new URI ("failover://tcp://localhost:61616"); 45 46 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(failoverURI); 47 ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 48 49 prefetchPolicy.setQueuePrefetch(MSG_COUNT - 10); 51 factory.setPrefetchPolicy(prefetchPolicy); 52 Connection connection = factory.createConnection(); 53 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 54 MessageProducer producer = session.createProducer(new ActiveMQQueue("Test")); 55 for (int idx = 0; idx < MSG_COUNT; ++idx) { 56 producer.send(session.createTextMessage("Test")); 57 } 58 producer.close(); 59 session.close(); 60 int count = 0; 61 62 Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 63 MessageConsumer consumer = consumerSession.createConsumer(new ActiveMQQueue("Test")); 64 connection.start(); 65 Message msg = consumer.receive(3000); 66 67 log.info("You should restart remote broker now and press enter!"); 71 System.in.read(); 72 restartRemoteBroker(); 74 msg.acknowledge(); 75 ++count; 76 77 for (int idx = 1; idx < MSG_COUNT; ++idx) { 78 msg = consumer.receive(3000); 79 if (msg == null) { 80 log.error("No messages received! Received:" + count); 81 break; 82 } 83 msg.acknowledge(); 84 ++count; 85 } 86 assertEquals(count, MSG_COUNT); 87 consumer.close(); 88 consumerSession.close(); 89 connection.close(); 90 91 connection = factory.createConnection(); 92 consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 93 consumer = consumerSession.createConsumer(new ActiveMQQueue("Test")); 94 connection.start(); 95 96 count = 0; 97 do { 98 msg = consumer.receive(1000); 99 if (msg != null) { 100 msg.acknowledge(); 101 ++count; 102 } 103 } 104 while (msg != null); 105 106 assertEquals(count, 0); 107 108 consumer.close(); 109 consumerSession.close(); 110 connection.close(); 111 } 112 113 protected String getRemoteURI() { 114 return "tcp://localhost:55555"; 115 } 116 } 117 | Popular Tags |