KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > LargeStreamletTest


1 package org.apache.activemq;
2 /**
3 *
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */

19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22
23 import java.io.InputStream JavaDoc;
24 import java.io.OutputStream JavaDoc;
25 import java.util.Random JavaDoc;
26
27 import javax.jms.Destination JavaDoc;
28 import javax.jms.Session JavaDoc;
29
30 import junit.framework.Assert;
31 import junit.framework.TestCase;
32 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
33 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
34
35 /**
36  * @author rnewson
37  */

38 public final class LargeStreamletTest extends TestCase {
39
40     private static final Log log = LogFactory.getLog(LargeStreamletTest.class);
41     
42     private static final String JavaDoc BROKER_URL = "vm://localhost?broker.persistent=false";
43
44     private static final int BUFFER_SIZE = 1 * 1024;
45
46     private static final int MESSAGE_COUNT = 10*1024;
47     
48     private AtomicInteger JavaDoc totalRead = new AtomicInteger JavaDoc();
49
50     private AtomicInteger JavaDoc totalWritten = new AtomicInteger JavaDoc();
51
52     private AtomicBoolean JavaDoc stopThreads = new AtomicBoolean JavaDoc(false);
53
54     protected Exception JavaDoc writerException;
55
56     protected Exception JavaDoc readerException;
57
58     public void testStreamlets() throws Exception JavaDoc {
59         final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
60                 BROKER_URL);
61
62         final ActiveMQConnection connection = (ActiveMQConnection) factory
63                 .createConnection();
64         connection.start();
65         try {
66             final Session JavaDoc session = connection.createSession(false,
67                     Session.AUTO_ACKNOWLEDGE);
68             try {
69                 final Destination JavaDoc destination = session.createQueue("wibble");
70                 final Thread JavaDoc readerThread = new Thread JavaDoc(new Runnable JavaDoc() {
71
72                     public void run() {
73                         totalRead.set(0);
74                         try {
75                             final InputStream JavaDoc inputStream = connection
76                                     .createInputStream(destination);
77                             try {
78                                 int read;
79                                 final byte[] buf = new byte[BUFFER_SIZE];
80                                 while (!stopThreads.get()
81                                         && (read = inputStream.read(buf)) != -1) {
82                                     totalRead.addAndGet(read);
83                                 }
84                             } finally {
85                                 inputStream.close();
86                             }
87                         } catch (Exception JavaDoc e) {
88                             readerException = e;
89                             e.printStackTrace();
90                         } finally {
91                             log.info(totalRead + " total bytes read.");
92                         }
93                     }
94                 });
95
96                 final Thread JavaDoc writerThread = new Thread JavaDoc(new Runnable JavaDoc() {
97
98                     public void run() {
99                         totalWritten.set(0);
100                         int count = MESSAGE_COUNT;
101                         try {
102                             final OutputStream JavaDoc outputStream = connection
103                                     .createOutputStream(destination);
104                             try {
105                                 final byte[] buf = new byte[BUFFER_SIZE];
106                                 new Random JavaDoc().nextBytes(buf);
107                                 while (count > 0 && !stopThreads.get()) {
108                                     outputStream.write(buf);
109                                     totalWritten.addAndGet(buf.length);
110                                     count--;
111                                 }
112                             } finally {
113                                 outputStream.close();
114                             }
115                         } catch (Exception JavaDoc e) {
116                             writerException = e;
117                             e.printStackTrace();
118                         } finally {
119                             log.info(totalWritten
120                                     + " total bytes written.");
121                         }
122                     }
123                 });
124
125                 readerThread.start();
126                 writerThread.start();
127
128                 
129                 // Wait till reader is has finished receiving all the messages or he has stopped
130
// receiving messages.
131
Thread.sleep(1000);
132                 int lastRead = totalRead.get();
133                 while( readerThread.isAlive() ) {
134                     readerThread.join(1000);
135                     // No progress?? then stop waiting..
136
if( lastRead == totalRead.get() ) {
137                         break;
138                     }
139                     lastRead = totalRead.get();
140                 }
141                 
142                 stopThreads.set(true);
143
144                 assertTrue("Should not have received a reader exception", readerException == null);
145                 assertTrue("Should not have received a writer exception", writerException == null);
146                 
147                 Assert.assertEquals("Not all messages accounted for",
148                         totalWritten.get(), totalRead.get());
149                 
150             } finally {
151                 session.close();
152             }
153         } finally {
154             connection.close();
155         }
156     }
157
158 }
159
Popular Tags