1 18 package org.apache.activemq.streams; 19 import java.io.DataInputStream ; 20 import java.io.DataOutputStream ; 21 22 import javax.jms.Destination ; 23 24 import junit.framework.Test; 25 26 import org.apache.activemq.ActiveMQConnection; 27 import org.apache.activemq.JmsTestSupport; 28 import org.apache.activemq.command.ActiveMQQueue; 29 import org.apache.activemq.command.ActiveMQTopic; 30 31 import java.util.concurrent.atomic.AtomicBoolean ; 32 33 36 public class JMSInputStreamTest extends JmsTestSupport { 37 38 protected DataOutputStream out; 39 protected DataInputStream in; 40 private ActiveMQConnection connection2; 41 42 public Destination destination; 43 44 public static Test suite() { 45 return suite(JMSInputStreamTest.class); 46 } 47 48 public static void main(String [] args) { 49 junit.textui.TestRunner.run(suite()); 50 } 51 52 public void initCombos() { 53 addCombinationValues("destination", new Object [] { 54 new ActiveMQQueue("TEST.QUEUE"), 55 new ActiveMQTopic("TEST.TOPIC") }); 56 } 57 58 61 protected void setUp() throws Exception { 62 super.setAutoFail(true); 63 super.setUp(); 64 connection2 = (ActiveMQConnection) factory.createConnection(userName, password); 65 connections.add(connection2); 66 out = new DataOutputStream (connection.createOutputStream(destination)); 67 in = new DataInputStream (connection2.createInputStream(destination)); 68 } 69 70 73 protected void tearDown() throws Exception { 74 super.tearDown(); 75 } 76 77 public void testStreams() throws Exception { 78 out.writeInt(4); 79 out.flush(); 80 assertTrue(in.readInt() == 4); 81 out.writeFloat(2.3f); 82 out.flush(); 83 assertTrue(in.readFloat() == 2.3f); 84 String str = "this is a test string"; 85 out.writeUTF(str); 86 out.flush(); 87 assertTrue(in.readUTF().equals(str)); 88 for (int i = 0;i < 100;i++) { 89 out.writeLong(i); 90 } 91 out.flush(); 92 for (int i = 0;i < 100;i++) { 93 assertTrue(in.readLong() == i); 94 } 95 } 96 97 public void testLarge() throws Exception { 98 final int TEST_DATA = 23; 99 final int DATA_LENGTH = 4096; 100 final int COUNT = 1024; 101 byte[] data = new byte[DATA_LENGTH]; 102 for (int i = 0;i < data.length;i++) { 103 data[i] = TEST_DATA; 104 } 105 final AtomicBoolean complete = new AtomicBoolean (false); 106 Thread runner = new Thread (new Runnable () { 107 public void run() { 108 try { 109 for (int x = 0;x < COUNT;x++) { 110 byte[] b = new byte[2048]; 111 in.readFully(b); 112 for (int i = 0;i < b.length;i++) { 113 assertTrue(b[i] == TEST_DATA); 114 } 115 } 116 complete.set(true); 117 synchronized(complete){ 118 complete.notify(); 119 } 120 } 121 catch (Exception ex) { 122 ex.printStackTrace(); 123 } 124 } 125 }); 126 runner.start(); 127 for (int i = 0;i < COUNT;i++) { 128 out.write(data); 129 } 130 out.flush(); 131 synchronized (complete) { 132 if (!complete.get()) { 133 complete.wait(30000); 134 } 135 } 136 assertTrue(complete.get()); 137 } 138 } 139 | Popular Tags |