KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > failover > FailoverConsumerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.failover;
19
20 import java.net.URI JavaDoc;
21
22 import javax.jms.Connection JavaDoc;
23 import javax.jms.Message JavaDoc;
24 import javax.jms.MessageConsumer JavaDoc;
25 import javax.jms.MessageProducer JavaDoc;
26 import javax.jms.Session JavaDoc;
27
28 import org.apache.activemq.ActiveMQConnectionFactory;
29 import org.apache.activemq.ActiveMQPrefetchPolicy;
30 import org.apache.activemq.command.ActiveMQQueue;
31 import org.apache.activemq.network.NetworkTestSupport;
32
33 public class FailoverConsumerTest extends NetworkTestSupport {
34     
35     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
36             .getLog(FailoverConsumerTest.class);
37     
38     public static final int MSG_COUNT = 100;
39     
40     public void testPublisherFailsOver() throws Exception JavaDoc {
41         // Uncomment this if you want to use remote broker created by NetworkTestSupport.
42
// But it doesn't work. See comments below.
43
// URI failoverURI = new URI("failover://"+remoteConnector.getServer().getConnectURI());
44
URI JavaDoc failoverURI = new URI JavaDoc("failover://tcp://localhost:61616");
45
46         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(failoverURI);
47         ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
48         
49         // Prefetch size must be less than messages in the queue!!
50
prefetchPolicy.setQueuePrefetch(MSG_COUNT - 10);
51         factory.setPrefetchPolicy(prefetchPolicy);
52         Connection JavaDoc connection = factory.createConnection();
53         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
54         MessageProducer JavaDoc producer = session.createProducer(new ActiveMQQueue("Test"));
55         for (int idx = 0; idx < MSG_COUNT; ++idx) {
56             producer.send(session.createTextMessage("Test"));
57         }
58         producer.close();
59         session.close();
60         int count = 0;
61
62         Session JavaDoc consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
63         MessageConsumer JavaDoc consumer = consumerSession.createConsumer(new ActiveMQQueue("Test"));
64         connection.start();
65         Message JavaDoc msg = consumer.receive(3000);
66         
67         // restartRemoteBroker() doesn't work (you won't get received any messages
68
// after restart, javadoc says, that messages should be received though).
69
// So we must use external broker ant restart it manually.
70
log.info("You should restart remote broker now and press enter!");
71         System.in.read();
72 // Thread.sleep(20000);
73
restartRemoteBroker();
74         msg.acknowledge();
75         ++count;
76         
77         for (int idx = 1; idx < MSG_COUNT; ++idx) {
78             msg = consumer.receive(3000);
79             if (msg == null) {
80                 log.error("No messages received! Received:" + count);
81                 break;
82             }
83             msg.acknowledge();
84             ++count;
85         }
86         assertEquals(count, MSG_COUNT);
87         consumer.close();
88         consumerSession.close();
89         connection.close();
90         
91         connection = factory.createConnection();
92         consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
93         consumer = consumerSession.createConsumer(new ActiveMQQueue("Test"));
94         connection.start();
95
96         count = 0;
97         do {
98             msg = consumer.receive(1000);
99             if (msg != null) {
100                 msg.acknowledge();
101                 ++count;
102             }
103         }
104         while (msg != null);
105         
106         assertEquals(count, 0);
107         
108         consumer.close();
109         consumerSession.close();
110         connection.close();
111     }
112     
113     protected String JavaDoc getRemoteURI() {
114         return "tcp://localhost:55555";
115     }
116 }
117
Popular Tags