1 package org.apache.activemq; 2 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 23 import java.io.InputStream ; 24 import java.io.OutputStream ; 25 import java.util.Random ; 26 27 import javax.jms.Destination ; 28 import javax.jms.Session ; 29 30 import junit.framework.Assert; 31 import junit.framework.TestCase; 32 import java.util.concurrent.atomic.AtomicBoolean ; 33 import java.util.concurrent.atomic.AtomicInteger ; 34 35 38 public final class LargeStreamletTest extends TestCase { 39 40 private static final Log log = LogFactory.getLog(LargeStreamletTest.class); 41 42 private static final String BROKER_URL = "vm://localhost?broker.persistent=false"; 43 44 private static final int BUFFER_SIZE = 1 * 1024; 45 46 private static final int MESSAGE_COUNT = 10*1024; 47 48 private AtomicInteger totalRead = new AtomicInteger (); 49 50 private AtomicInteger totalWritten = new AtomicInteger (); 51 52 private AtomicBoolean stopThreads = new AtomicBoolean (false); 53 54 protected Exception writerException; 55 56 protected Exception readerException; 57 58 public void testStreamlets() throws Exception { 59 final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( 60 BROKER_URL); 61 62 final ActiveMQConnection connection = (ActiveMQConnection) factory 63 .createConnection(); 64 connection.start(); 65 try { 66 final Session session = connection.createSession(false, 67 Session.AUTO_ACKNOWLEDGE); 68 try { 69 final Destination destination = session.createQueue("wibble"); 70 final Thread readerThread = new Thread (new Runnable () { 71 72 public void run() { 73 totalRead.set(0); 74 try { 75 final InputStream inputStream = connection 76 .createInputStream(destination); 77 try { 78 int read; 79 final byte[] buf = new byte[BUFFER_SIZE]; 80 while (!stopThreads.get() 81 && (read = inputStream.read(buf)) != -1) { 82 totalRead.addAndGet(read); 83 } 84 } finally { 85 inputStream.close(); 86 } 87 } catch (Exception e) { 88 readerException = e; 89 e.printStackTrace(); 90 } finally { 91 log.info(totalRead + " total bytes read."); 92 } 93 } 94 }); 95 96 final Thread writerThread = new Thread (new Runnable () { 97 98 public void run() { 99 totalWritten.set(0); 100 int count = MESSAGE_COUNT; 101 try { 102 final OutputStream outputStream = connection 103 .createOutputStream(destination); 104 try { 105 final byte[] buf = new byte[BUFFER_SIZE]; 106 new Random ().nextBytes(buf); 107 while (count > 0 && !stopThreads.get()) { 108 outputStream.write(buf); 109 totalWritten.addAndGet(buf.length); 110 count--; 111 } 112 } finally { 113 outputStream.close(); 114 } 115 } catch (Exception e) { 116 writerException = e; 117 e.printStackTrace(); 118 } finally { 119 log.info(totalWritten 120 + " total bytes written."); 121 } 122 } 123 }); 124 125 readerThread.start(); 126 writerThread.start(); 127 128 129 Thread.sleep(1000); 132 int lastRead = totalRead.get(); 133 while( readerThread.isAlive() ) { 134 readerThread.join(1000); 135 if( lastRead == totalRead.get() ) { 137 break; 138 } 139 lastRead = totalRead.get(); 140 } 141 142 stopThreads.set(true); 143 144 assertTrue("Should not have received a reader exception", readerException == null); 145 assertTrue("Should not have received a writer exception", writerException == null); 146 147 Assert.assertEquals("Not all messages accounted for", 148 totalWritten.get(), totalRead.get()); 149 150 } finally { 151 session.close(); 152 } 153 } finally { 154 connection.close(); 155 } 156 } 157 158 } 159 | Popular Tags |