1 package net.walend.jmsbridge.test; 2 3 import java.util.List ; 4 import java.util.ArrayList ; 5 6 import javax.jms.TopicConnection ; 7 import javax.jms.Topic ; 8 import javax.jms.TopicSession ; 9 import javax.jms.TopicPublisher ; 10 import javax.jms.Message ; 11 import javax.jms.ObjectMessage ; 12 import javax.jms.TopicSubscriber ; 13 import javax.jms.Session ; 14 import javax.jms.JMSException ; 15 import javax.jms.MessageListener ; 16 17 import junit.framework.TestSuite; 18 import junit.framework.Test; 19 20 import net.walend.toolkit.junit.TestCase; 21 22 import net.walend.somnifugi.SomniJNDIBypass; 23 import net.walend.somnifugi.SomniDefaultExceptionListener; 24 25 import net.walend.jmsbridge.TopicBridge; 26 27 32 33 public class TopicTest extends TestCase 34 { 35 public TopicTest(String testName) 36 { 37 super(testName); 38 } 39 40 protected class TenObjectPublisher 41 implements Runnable 42 { 43 private TopicConnection connection; 44 private Topic topic; 45 46 public TenObjectPublisher(TopicConnection connection,Topic topic) 47 { 48 this.connection = connection; 49 this.topic = topic; 50 } 51 52 public void run() 53 { 54 try 55 { 56 Thread.sleep(10); 57 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 58 TopicPublisher publisher = session.createPublisher(topic); 59 for(int i=0;i<10;i++) 60 { 61 Message message = session.createObjectMessage(new Integer (i)); 62 publisher.publish(message); 63 } 64 } 65 catch(InterruptedException ie) 66 { 67 fail(ie); 68 } 69 catch(JMSException jmse) 70 { 71 fail(jmse); 72 } 73 } 74 } 75 76 protected class TenObjectSubscriber 77 implements Runnable 78 { 79 private TopicConnection connection; 80 private Topic topic; 81 private List <Object > expected = new ArrayList <Object >(); 82 private List <Object > results = new ArrayList <Object >(); 83 private final Object guard = new Object (); 84 85 86 public TenObjectSubscriber(TopicConnection connection,Topic topic) 87 { 88 this.connection = connection; 89 this.topic = topic; 90 for(int i=0;i<10;i++) 91 { 92 expected.add(new Integer (i)); 93 } 94 } 95 96 public void test() 97 { 98 synchronized(guard) 99 { 100 assertTrue("Expected results to be "+expected+", but got "+results,results.equals(expected)); 101 } 102 } 103 104 105 public void run() 106 { 107 synchronized(guard) 108 { 109 try 110 { 111 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 112 TopicSubscriber subscriber = session.createSubscriber(topic); 113 114 for(int i=0;i<10;i++) 115 { 116 ObjectMessage message = (ObjectMessage )subscriber.receive(200); 117 assertTrue("Message not received after 200 ms.",message!=null); 118 119 results.add(message.getObject()); 120 } 121 } 122 catch(JMSException jmse) 123 { 124 fail(jmse); 125 } 126 } 127 } 128 } 129 130 131 132 public void testTwoThreads() 133 { 134 try 135 { 136 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 137 connection.start(); 138 139 Topic bridgeIn = SomniJNDIBypass.IT.getTopic("twoThreadBridgeIn"); 140 Topic bridgeOut = SomniJNDIBypass.IT.getTopic("twoThreadBridgeOut"); 141 142 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 143 TopicSubscriber source = session.createSubscriber(bridgeIn); 144 TopicPublisher sink = session.createPublisher(bridgeOut); 145 146 TopicBridge bridge = new TopicBridge(source,sink,session,new SomniDefaultExceptionListener()); 148 149 TenObjectSubscriber tos = new TenObjectSubscriber(connection,bridgeOut); 150 151 Thread subscribeThread = new Thread (tos); 152 subscribeThread.start(); 153 154 Thread.sleep(10); 155 156 Thread publishThread = new Thread (new TenObjectPublisher(connection,bridgeIn)); 157 publishThread.start(); 158 159 publishThread.join(10000); 160 subscribeThread.join(10000); 161 162 tos.test(); 163 164 } 165 catch(JMSException jmse) 166 { 167 fail(jmse); 168 } 169 catch(InterruptedException ie) 170 { 171 fail(ie); 172 } 173 } 174 175 public void testTwoSubscribers() 176 { 177 try 178 { 179 TopicConnection connection = SomniJNDIBypass.IT.getTopicConnectionFactory().createTopicConnection(); 180 connection.start(); 181 182 Topic bridgeIn = SomniJNDIBypass.IT.getTopic("twoSubsBridgeIn"); 183 Topic bridgeOut = SomniJNDIBypass.IT.getTopic("twoSubsBridgeOut"); 184 185 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 186 TopicSubscriber source = session.createSubscriber(bridgeIn); 187 TopicPublisher sink = session.createPublisher(bridgeOut); 188 189 TopicBridge bridge = new TopicBridge(source,sink,session,new SomniDefaultExceptionListener()); 191 192 193 TenObjectSubscriber tos1 = new TenObjectSubscriber(connection,bridgeOut); 194 TenObjectSubscriber tos2 = new TenObjectSubscriber(connection,bridgeOut); 195 196 Thread subscribeThread1 = new Thread (tos1); 197 subscribeThread1.start(); 198 199 Thread.sleep(10); 200 201 Thread subscribeThread2 = new Thread (tos2); 202 subscribeThread2.start(); 203 204 Thread.sleep(10); 205 206 Thread publishThread = new Thread (new TenObjectPublisher(connection,bridgeIn)); 207 publishThread.start(); 208 209 publishThread.join(10000); 210 subscribeThread1.join(10000); 211 subscribeThread2.join(10000); 212 213 tos1.test(); 214 tos2.test(); 215 } 216 catch(JMSException jmse) 217 { 218 fail(jmse); 219 } 220 catch(InterruptedException ie) 221 { 222 fail(ie); 223 } 224 } 225 226 public static Test suite() 227 { 228 TestSuite suite = new TestSuite() ; 229 230 suite.addTest(new TopicTest("testTwoThreads")); 231 suite.addTest(new TopicTest("testTwoSubscribers")); 232 233 return suite; 234 } 235 } 236 237 238 260 261 262 | Popular Tags |