KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > udp > UdpTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.udp;
19
20 import java.io.IOException JavaDoc;
21
22 import javax.jms.MessageNotWriteableException JavaDoc;
23
24 import junit.framework.TestCase;
25
26 import org.apache.activemq.command.ActiveMQDestination;
27 import org.apache.activemq.command.ActiveMQQueue;
28 import org.apache.activemq.command.ActiveMQTextMessage;
29 import org.apache.activemq.command.Command;
30 import org.apache.activemq.command.ConsumerInfo;
31 import org.apache.activemq.command.ProducerInfo;
32 import org.apache.activemq.command.Response;
33 import org.apache.activemq.command.WireFormatInfo;
34 import org.apache.activemq.transport.Transport;
35 import org.apache.activemq.transport.TransportAcceptListener;
36 import org.apache.activemq.transport.TransportListener;
37 import org.apache.activemq.transport.TransportServer;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40
41 /**
42  *
43  * @version $Revision: 464110 $
44  */

45 public abstract class UdpTestSupport extends TestCase implements TransportListener {
46
47     protected static final Log log = LogFactory.getLog(UdpTestSupport.class);
48
49     protected Transport producer;
50     protected Transport consumer;
51
52     protected Object JavaDoc lock = new Object JavaDoc();
53     protected Command receivedCommand;
54     protected TransportServer server;
55     protected boolean large;
56
57     // You might want to set this to massive number if debugging
58
protected int waitForCommandTimeout = 40000;
59
60     public void testSendingSmallMessage() throws Exception JavaDoc {
61         ConsumerInfo expected = new ConsumerInfo();
62         expected.setSelector("Cheese");
63         expected.setExclusive(true);
64         expected.setExclusive(true);
65         expected.setPrefetchSize(3456);
66
67         try {
68             log.info("About to send: " + expected);
69             producer.oneway(expected);
70
71             Command received = assertCommandReceived();
72             assertTrue("Should have received a ConsumerInfo but was: " + received, received instanceof ConsumerInfo);
73             ConsumerInfo actual = (ConsumerInfo) received;
74             assertEquals("Selector", expected.getSelector(), actual.getSelector());
75             assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive());
76             assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize());
77         }
78         catch (Exception JavaDoc e) {
79             log.info("Caught: " + e);
80             e.printStackTrace();
81             fail("Failed to send to transport: " + e);
82         }
83     }
84
85     public void testSendingMediumMessage() throws Exception JavaDoc {
86         String JavaDoc text = createMessageBodyText(4 * 105);
87         ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Medium");
88         assertSendTextMessage(destination, text);
89     }
90
91     public void testSendingLargeMessage() throws Exception JavaDoc {
92         String JavaDoc text = createMessageBodyText(4 * 1024);
93         ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Large");
94         assertSendTextMessage(destination, text);
95     }
96
97     protected void assertSendTextMessage(ActiveMQDestination destination, String JavaDoc text)
98             throws MessageNotWriteableException JavaDoc {
99         large = true;
100
101         ActiveMQTextMessage expected = new ActiveMQTextMessage();
102
103         expected.setText(text);
104         expected.setDestination(destination);
105
106         try {
107             log.info("About to send message of type: " + expected.getClass());
108             producer.oneway(expected);
109
110             // lets send a dummy command to ensure things don't block if we
111
// discard the last one
112
// keepalive does not have a commandId...
113
// producer.oneway(new KeepAliveInfo());
114
producer.oneway(new ProducerInfo());
115             producer.oneway(new ProducerInfo());
116
117             Command received = assertCommandReceived();
118             assertTrue("Should have received a ActiveMQTextMessage but was: " + received,
119                     received instanceof ActiveMQTextMessage);
120             ActiveMQTextMessage actual = (ActiveMQTextMessage) received;
121
122             assertEquals("getDestination", expected.getDestination(), actual.getDestination());
123             assertEquals("getText", expected.getText(), actual.getText());
124
125             log.info("Received text message with: " + actual.getText().length() + " character(s)");
126         }
127         catch (Exception JavaDoc e) {
128             log.info("Caught: " + e);
129             e.printStackTrace();
130             fail("Failed to send to transport: " + e);
131         }
132     }
133
134     protected String JavaDoc createMessageBodyText(int loopSize) {
135         StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
136         for (int i = 0; i < loopSize; i++) {
137             buffer.append("0123456789");
138         }
139         return buffer.toString();
140     }
141
142     protected void setUp() throws Exception JavaDoc {
143         server = createServer();
144         if (server != null) {
145             server.setAcceptListener(new TransportAcceptListener() {
146
147                 public void onAccept(Transport transport) {
148                     consumer = transport;
149                     consumer.setTransportListener(UdpTestSupport.this);
150                     try {
151                         consumer.start();
152                     }
153                     catch (Exception JavaDoc e) {
154                         throw new RuntimeException JavaDoc(e);
155                     }
156                 }
157
158                 public void onAcceptError(Exception JavaDoc error) {
159                 }
160             });
161             server.start();
162         }
163
164         consumer = createConsumer();
165         if (consumer != null) {
166             consumer.setTransportListener(this);
167             consumer.start();
168         }
169
170         producer = createProducer();
171         producer.setTransportListener(new TransportListener() {
172             public void onCommand(Object JavaDoc command) {
173                 log.info("Producer received: " + command);
174             }
175
176             public void onException(IOException JavaDoc error) {
177                 log.info("Producer exception: " + error);
178                 error.printStackTrace();
179             }
180
181             public void transportInterupted() {
182             }
183
184             public void transportResumed() {
185             }
186         });
187
188         producer.start();
189     }
190
191     protected void tearDown() throws Exception JavaDoc {
192         if (producer != null) {
193             producer.stop();
194         }
195         if (consumer != null) {
196             consumer.stop();
197         }
198         if (server != null) {
199             server.stop();
200         }
201     }
202
203     public void onCommand(Object JavaDoc o) {
204         final Command command = (Command) o;
205         if (command instanceof WireFormatInfo) {
206             log.info("Got WireFormatInfo: " + command);
207         }
208         else {
209             if (command.isResponseRequired()) {
210                 // lets send a response back...
211
sendResponse(command);
212
213             }
214             if (large) {
215                 log.info("### Received command: " + command.getClass() + " with id: "
216                         + command.getCommandId());
217             }
218             else {
219                 log.info("### Received command: " + command);
220             }
221
222             synchronized (lock) {
223                 if (receivedCommand == null) {
224                     receivedCommand = command;
225                 }
226                 else {
227                     log.info("Ignoring superfluous command: " + command);
228                 }
229                 lock.notifyAll();
230             }
231         }
232     }
233
234     protected void sendResponse(Command command) {
235         Response response = new Response();
236         response.setCorrelationId(command.getCommandId());
237         try {
238             consumer.oneway(response);
239         }
240         catch (IOException JavaDoc e) {
241             log.info("Caught: " + e);
242             e.printStackTrace();
243             throw new RuntimeException JavaDoc(e);
244         }
245     }
246
247     public void onException(IOException JavaDoc error) {
248         log.info("### Received error: " + error);
249         error.printStackTrace();
250     }
251
252     public void transportInterupted() {
253         log.info("### Transport interrupted");
254     }
255
256     public void transportResumed() {
257         log.info("### Transport resumed");
258     }
259
260     protected Command assertCommandReceived() throws InterruptedException JavaDoc {
261         Command answer = null;
262         synchronized (lock) {
263             answer = receivedCommand;
264             if (answer == null) {
265                 lock.wait(waitForCommandTimeout);
266             }
267             answer = receivedCommand;
268         }
269
270         assertNotNull("Should have received a Command by now!", answer);
271         return answer;
272     }
273
274     protected abstract Transport createConsumer() throws Exception JavaDoc;
275
276     protected abstract Transport createProducer() throws Exception JavaDoc;
277
278     protected TransportServer createServer() throws Exception JavaDoc {
279         return null;
280     }
281
282 }
283
Popular Tags