KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > JmsTopicRequestReplyTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import java.util.List JavaDoc;
21 import java.util.Vector JavaDoc;
22
23 import javax.jms.Connection JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.jms.TextMessage JavaDoc;
32
33 import org.apache.activemq.test.TestSupport;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37 /**
38  * @version $Revision: 1.3 $
39  */

40 public class JmsTopicRequestReplyTest extends TestSupport implements MessageListener JavaDoc {
41     private final Log log = LogFactory.getLog(getClass());
42
43     private Connection JavaDoc serverConnection;
44     private Connection JavaDoc clientConnection;
45     private MessageProducer JavaDoc replyProducer;
46     private Session JavaDoc serverSession;
47     private Destination JavaDoc requestDestination;
48     private List JavaDoc failures = new Vector JavaDoc();
49     private boolean dynamicallyCreateProducer;
50     protected boolean useAsyncConsume = false;
51     private String JavaDoc clientSideClientID;
52
53     public void testSendAndReceive() throws Exception JavaDoc {
54         clientConnection = createConnection();
55         clientConnection.setClientID("ClientConnection:" + getSubject());
56
57         Session JavaDoc session = clientConnection.createSession(false,
58                 Session.AUTO_ACKNOWLEDGE);
59
60         clientConnection.start();
61
62         Destination JavaDoc replyDestination = createTemporaryDestination(session);
63
64
65         // lets test the destination
66
clientSideClientID = clientConnection.getClientID();
67         
68         //TODO
69
//String value = ActiveMQDestination.getClientId((ActiveMQDestination) replyDestination);
70
//assertEquals("clientID from the temporary destination must be the same", clientSideClientID, value);
71
log.info("Both the clientID and destination clientID match properly: " + clientSideClientID);
72
73
74         /* build queues */
75         MessageProducer JavaDoc requestProducer =
76                 session.createProducer(requestDestination);
77         MessageConsumer JavaDoc replyConsumer =
78                 session.createConsumer(replyDestination);
79
80
81         /* build requestmessage */
82         TextMessage JavaDoc requestMessage = session.createTextMessage("Olivier");
83         requestMessage.setJMSReplyTo(replyDestination);
84         requestProducer.send(requestMessage);
85
86         log.info("Sent request.");
87         log.info(requestMessage.toString());
88
89         Message msg = replyConsumer.receive(5000);
90
91
92         if (msg instanceof TextMessage JavaDoc) {
93             TextMessage JavaDoc replyMessage = (TextMessage JavaDoc) msg;
94             log.info("Received reply.");
95             log.info(replyMessage.toString());
96             assertEquals("Wrong message content", "Hello: Olivier", replyMessage.getText());
97         }
98         else {
99             fail("Should have received a reply by now");
100         }
101
102         assertEquals("Should not have had any failures: " + failures, 0, failures.size());
103     }
104
105     public void testSendAndReceiveWithDynamicallyCreatedProducer() throws Exception JavaDoc {
106         dynamicallyCreateProducer = true;
107         testSendAndReceive();
108     }
109
110     /**
111      * Use the asynchronous subscription mechanism
112      */

113     public void onMessage(Message message) {
114         try {
115             TextMessage JavaDoc requestMessage = (TextMessage JavaDoc) message;
116
117             log.info("Received request.");
118             log.info(requestMessage.toString());
119
120             Destination JavaDoc replyDestination = requestMessage.getJMSReplyTo();
121
122             //TODO
123
//String value = ActiveMQDestination.getClientId((ActiveMQDestination) replyDestination);
124
//assertEquals("clientID from the temporary destination must be the same", clientSideClientID, value);
125

126             TextMessage JavaDoc replyMessage = serverSession.createTextMessage("Hello: " + requestMessage.getText());
127
128             replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
129
130             if (dynamicallyCreateProducer) {
131                 replyProducer = serverSession.createProducer(replyDestination);
132                 replyProducer.send(replyMessage);
133             }
134             else {
135                 replyProducer.send(replyDestination, replyMessage);
136             }
137
138             log.info("Sent reply.");
139             log.info(replyMessage.toString());
140         }
141         catch (JMSException JavaDoc e) {
142             onException(e);
143         }
144     }
145
146     /**
147      * Use the synchronous subscription mechanism
148      */

149     protected void syncConsumeLoop(MessageConsumer JavaDoc requestConsumer) {
150         try {
151             Message message = requestConsumer.receive(5000);
152             if (message != null) {
153                 onMessage(message);
154             }
155             else {
156                 log.error("No message received");
157             }
158         }
159         catch (JMSException JavaDoc e) {
160             onException(e);
161         }
162     }
163
164
165     protected void setUp() throws Exception JavaDoc {
166         super.setUp();
167
168         serverConnection = createConnection();
169         serverConnection.setClientID("serverConnection:" + getSubject());
170         serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
171
172         replyProducer = serverSession.createProducer(null);
173
174         requestDestination = createDestination(serverSession);
175
176         /* build queues */
177         final MessageConsumer JavaDoc requestConsumer = serverSession.createConsumer(requestDestination);
178         if (useAsyncConsume) {
179             requestConsumer.setMessageListener(this);
180         }
181         else {
182             Thread JavaDoc thread = new Thread JavaDoc(new Runnable JavaDoc() {
183                 public void run() {
184                     syncConsumeLoop(requestConsumer);
185                 }
186             });
187             thread.start();
188         }
189         serverConnection.start();
190     }
191
192     protected void tearDown() throws Exception JavaDoc {
193         super.tearDown();
194
195         serverConnection.close();
196         clientConnection.stop();
197         clientConnection.close();
198     }
199
200     protected void onException(JMSException JavaDoc e) {
201         log.info("Caught: " + e);
202         e.printStackTrace();
203         failures.add(e);
204     }
205
206     protected Destination JavaDoc createDestination(Session JavaDoc session) throws JMSException JavaDoc {
207         if (topic) {
208             return session.createTopic(getSubject());
209         }
210         return session.createQueue(getSubject());
211     }
212
213     protected Destination JavaDoc createTemporaryDestination(Session JavaDoc session) throws JMSException JavaDoc {
214         if (topic) {
215             return session.createTemporaryTopic();
216         }
217         return session.createTemporaryQueue();
218     }
219
220 }
221
Popular Tags