KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > CompositePublishTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19
20 import org.apache.activemq.ActiveMQConnectionFactory;
21 import org.apache.activemq.command.ActiveMQTopic;
22 import org.apache.activemq.test.JmsSendReceiveTestSupport;
23
24 import javax.jms.Connection JavaDoc;
25 import javax.jms.Destination JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageConsumer JavaDoc;
29 import javax.jms.MessageListener JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import java.util.List JavaDoc;
32
33 /**
34  * @version $Revision: 1.1.1.1 $
35  */

36 public class CompositePublishTest extends JmsSendReceiveTestSupport {
37
38     protected Connection JavaDoc sendConnection;
39     protected Connection JavaDoc receiveConnection;
40     protected Session JavaDoc receiveSession;
41     protected MessageConsumer JavaDoc[] consumers;
42     protected List JavaDoc[] messageLists;
43
44     protected void setUp() throws Exception JavaDoc {
45         super.setUp();
46
47         connectionFactory = createConnectionFactory();
48
49         sendConnection = createConnection();
50         sendConnection.start();
51
52         receiveConnection = createConnection();
53         receiveConnection.start();
54
55         log.info("Created sendConnection: " + sendConnection);
56         log.info("Created receiveConnection: " + receiveConnection);
57
58         session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
59         receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
60
61         log.info("Created sendSession: " + session);
62         log.info("Created receiveSession: " + receiveSession);
63
64         producer = session.createProducer(null);
65
66         log.info("Created producer: " + producer);
67
68         if (topic) {
69             consumerDestination = session.createTopic(getConsumerSubject());
70             producerDestination = session.createTopic(getProducerSubject());
71         }
72         else {
73             consumerDestination = session.createQueue(getConsumerSubject());
74             producerDestination = session.createQueue(getProducerSubject());
75         }
76
77         log.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
78         log.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
79
80         Destination JavaDoc[] destinations = getDestinations();
81         consumers = new MessageConsumer JavaDoc[destinations.length];
82         messageLists = new List JavaDoc[destinations.length];
83         for (int i = 0; i < destinations.length; i++) {
84             Destination JavaDoc dest = destinations[i];
85             messageLists[i] = createConcurrentList();
86             consumers[i] = receiveSession.createConsumer(dest);
87             consumers[i].setMessageListener(createMessageListener(i, messageLists[i]));
88         }
89
90
91         log.info("Started connections");
92     }
93
94     protected MessageListener JavaDoc createMessageListener(int i, final List JavaDoc messageList) {
95         return new MessageListener JavaDoc() {
96             public void onMessage(Message JavaDoc message) {
97                 consumeMessage(message, messageList);
98             }
99         };
100     }
101
102     /**
103      * Returns the subject on which we publish
104      */

105     protected String JavaDoc getSubject() {
106         return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y";
107     }
108
109     /**
110      * Returns the destinations to which we consume
111      */

112     protected Destination JavaDoc[] getDestinations() {
113         return new Destination JavaDoc[]{new ActiveMQTopic(getPrefix() + "FOO.BAR"), new ActiveMQTopic(getPrefix() + "FOO.*"), new ActiveMQTopic(getPrefix() + "FOO.X.Y")};
114     }
115
116     protected String JavaDoc getPrefix() {
117         return super.getSubject() + ".";
118     }
119
120     protected void assertMessagesAreReceived() throws JMSException JavaDoc {
121         waitForMessagesToBeDelivered();
122
123         for (int i = 0, size = messageLists.length; i < size; i++) {
124             log.info("Message list: " + i + " contains: " + messageLists[i].size() + " message(s)");
125         }
126
127         for (int i = 0, size = messageLists.length; i < size; i++) {
128             assertMessagesReceivedAreValid(messageLists[i]);
129         }
130     }
131
132     protected ActiveMQConnectionFactory createConnectionFactory() {
133         return new ActiveMQConnectionFactory("vm://localhost");
134     }
135
136     protected void tearDown() throws Exception JavaDoc {
137         session.close();
138         receiveSession.close();
139
140         sendConnection.close();
141         receiveConnection.close();
142     }
143 }
144
Popular Tags