1 package net.walend.somnifugi.test; 2 3 import java.util.List ; 4 import java.util.ArrayList ; 5 import java.util.Hashtable ; 6 7 import javax.naming.InitialContext ; 8 import javax.naming.Context ; 9 import javax.naming.NamingException ; 10 11 import javax.jms.QueueConnectionFactory ; 12 import javax.jms.QueueConnection ; 13 import javax.jms.Queue ; 14 import javax.jms.QueueSession ; 15 import javax.jms.QueueSender ; 16 import javax.jms.Message ; 17 import javax.jms.ObjectMessage ; 18 import javax.jms.QueueReceiver ; 19 import javax.jms.Session ; 20 import javax.jms.JMSException ; 21 import javax.jms.MessageListener ; 22 import javax.jms.QueueRequestor ; 23 import javax.jms.Topic ; 24 import javax.jms.TopicConnection ; 25 import javax.jms.TopicConnectionFactory ; 26 import javax.jms.TopicPublisher ; 27 import javax.jms.TopicSession ; 28 import javax.jms.TopicSubscriber ; 29 30 import junit.framework.TestSuite; 31 import junit.framework.Test; 32 33 import net.walend.toolkit.junit.TestCase; 34 35 41 42 public class AnonymousProducerTest 43 extends TestCase 44 { 45 public AnonymousProducerTest(String testName) 46 { 47 super(testName); 48 } 49 50 51 protected class ObjectSender 52 implements Runnable 53 { 54 private int delay; 55 private QueueSession session; 56 private QueueSender sender; 57 private Queue q; 58 private boolean stopped = false; 59 private Exception exception = null; 60 61 public ObjectSender(QueueSession session, QueueSender sender, Queue q,int delay) 62 { 63 this.sender = sender; 64 this.session = session; 65 this.delay = delay; 66 this.q = q; 67 } 68 69 public void run() 70 { 71 try 72 { 73 int i = 0; 74 while( !stopped ) 75 { 76 byte[] bytes = new byte[10000]; 77 if ( i++ % 100 == 0 ) 78 { 79 System.out.println("Sent : " + i ); 80 } 81 Message message = session.createObjectMessage(bytes); 82 sender.send(q, message); 83 if ( delay > 0 ) 84 { 85 Thread.currentThread().sleep(delay); 86 } 87 } 88 } 89 catch(Exception e) 90 { 91 exception = e; 92 } 93 } 94 95 public void stop() 96 { 97 this.stopped = true; 98 } 99 100 public Exception getException() 101 { 102 return exception; 103 } 104 } 105 106 protected class ObjectPublisher 107 implements Runnable 108 { 109 private int delay; 110 private TopicSession session; 111 private TopicPublisher sender; 112 private Topic t; 113 private boolean stopped = false; 114 private Exception exception = null; 115 116 public ObjectPublisher(TopicSession session, TopicPublisher sender, Topic t, int delay) 117 { 118 this.sender = sender; 119 this.session = session; 120 this.delay = delay; 121 this.t = t; 122 } 123 124 public void run() 125 { 126 try 127 { 128 int i = 0; 129 while( !stopped ) 130 { 131 byte[] bytes = new byte[10000]; 132 if ( i++ % 100 == 0 ) 133 { 134 System.out.println("Sent : " + i ); 135 } 136 Message message = session.createObjectMessage(bytes); 137 sender.publish(t, message); 138 if ( delay > 0 ) 139 { 140 Thread.currentThread().sleep(delay); 141 } 142 } 143 } 144 catch(Exception e) 145 { 146 exception = e; 147 } 148 } 149 150 public void stop() 151 { 152 this.stopped = true; 153 } 154 155 public Exception getException() 156 { 157 return exception; 158 } 159 } 160 161 protected class ObjectReceiver 162 implements Runnable 163 { 164 private QueueSession session; 165 private QueueReceiver receiver; 166 private long delay; 167 private boolean stopped = false; 168 private Exception exception = null; 169 170 public ObjectReceiver(QueueSession session,QueueReceiver receiver,long delay) 171 { 172 this.session = session; 173 this.receiver = receiver; 174 this.delay = delay; 175 } 176 177 public void run() 178 { 179 try 180 { 181 int i = 0; 182 while( !stopped ) 183 { 184 ObjectMessage message = (ObjectMessage )receiver.receive(100); 185 if ( message == null ) 186 { 187 System.out.println("Receive timeout"); 188 } 189 else 190 { 191 if ( i++ % 100 == 0 ) 192 { 193 System.out.println("Received : " + i ); 194 } 195 if ( delay > 0 ) 196 { 197 Thread.currentThread().sleep(delay); 198 } 199 } 200 } 201 } 202 catch(Exception e) 203 { 204 exception = e; 205 } 206 } 207 208 public void stop() 209 { 210 this.stopped = true; 211 } 212 213 public Exception getException() 214 { 215 return exception; 216 } 217 } 218 219 protected class ObjectSubscriber 220 implements Runnable 221 { 222 private TopicSession session; 223 private TopicSubscriber receiver; 224 private long delay; 225 private boolean stopped = false; 226 private Exception exception = null; 227 228 public ObjectSubscriber(TopicSession session,TopicSubscriber receiver,long delay) 229 { 230 this.session = session; 231 this.receiver = receiver; 232 this.delay = delay; 233 } 234 235 public void run() 236 { 237 try 238 { 239 int i = 0; 240 while( !stopped ) 241 { 242 ObjectMessage message = (ObjectMessage )receiver.receive(100); 243 if ( message == null ) { 244 System.out.println("Receive timeout"); 245 } 246 else 247 { 248 if ( i++ % 100 == 0 ) 249 { 250 System.out.println("Received : " + i ); 251 } 252 if ( delay > 0 ) 253 { 254 Thread.currentThread().sleep(delay); 255 } 256 } 257 } 258 } 259 catch(Exception e) 260 { 261 exception = e; 262 } 263 } 264 265 public void stop() 266 { 267 this.stopped = true; 268 } 269 270 public Exception getException() 271 { 272 return exception; 273 } 274 } 275 276 277 public void testQueueAnonymousSend() 278 { 279 try 280 { 281 Hashtable <String ,String > env = new Hashtable <String ,String >(11); 283 284 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory"); 285 env.put(Context.PROVIDER_URL,"<not-used>"); 286 env.put("default.capacity", "1000"); 287 env.put("default.timeout", "10000"); 288 env.put("testAnonymousQueue1.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory"); 289 env.put("wrapped-testAnonymousQueue1.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory"); 290 env.put("testAnonymousQueue2.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory"); 291 env.put("wrapped-testAnonymousQueue2.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory"); 292 293 Context ctx = new InitialContext (env); 295 QueueConnection connection = (QueueConnection )ctx.lookup("Connection"); 296 connection.start(); 297 Queue queue1 = (Queue )ctx.lookup("testAnonymousQueue1"); 298 Queue queue2 = (Queue )ctx.lookup("testAnonymousQueue2"); 299 300 QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 301 QueueSender sender = session.createSender(queue1); 302 303 QueueSession session2 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 304 QueueReceiver receiver = session2.createReceiver(queue2); 305 306 ObjectSender senderHelper = new ObjectSender(session, sender, queue2, 0); 307 Thread sendThread = new Thread (senderHelper); 308 sendThread.start(); 309 310 ObjectReceiver receiverHelper = new ObjectReceiver(session2,receiver,100); 311 Thread receiveThread = new Thread (receiverHelper); 312 receiveThread.start(); 313 314 Thread.currentThread().sleep(60000); 315 senderHelper.stop(); 316 receiverHelper.stop(); 317 sendThread.join(1000); 318 receiveThread.join(1000); 319 if ( senderHelper.getException() != null ) 320 { 321 fail(senderHelper.getException()); 322 } 323 if ( receiverHelper.getException() != null ) 324 { 325 fail(receiverHelper.getException()); 326 } 327 328 sender.close(); 329 receiver.close(); 330 session.close(); 331 session2.close(); 332 connection.close(); 333 } 334 catch(Exception e) 335 { 336 fail(e); 337 } 338 } 339 340 341 342 public void testTopicAnonymousSend() 343 { 344 try 345 { 346 Hashtable <String ,String > env = new Hashtable <String ,String >(11); 348 349 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniTopicContextFactory"); 350 env.put(Context.PROVIDER_URL,"<not-used>"); 351 env.put("default.capacity", "1000"); 352 env.put("default.timeout", "10000"); 353 env.put("testAnonymousTopic1.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory"); 354 env.put("wrapped-testAnonymousTopic1.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory"); 355 env.put("testAnonymousTopic2.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory"); 356 env.put("wrapped-testAnonymousTopic2.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory"); 357 358 Context ctx = new InitialContext (env); 360 TopicConnection connection = (TopicConnection )ctx.lookup("Connection"); 361 connection.start(); 362 Topic topic1 = (Topic )ctx.lookup("testAnonymousTopic1"); 363 Topic topic2 = (Topic )ctx.lookup("testAnonymousTopic2"); 364 365 TopicSession session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 366 TopicPublisher sender = session.createPublisher(topic1); 367 368 TopicSession session2 = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 369 TopicSubscriber subscriber = session2.createSubscriber(topic2); 370 371 ObjectPublisher senderHelper = new ObjectPublisher(session, sender,topic2, 0); 372 Thread sendThread = new Thread (senderHelper); 373 sendThread.start(); 374 375 ObjectSubscriber receiverHelper = new ObjectSubscriber(session2,subscriber,100); 376 Thread receiveThread = new Thread (receiverHelper); 377 receiveThread.start(); 378 379 Thread.currentThread().sleep(60000); 380 senderHelper.stop(); 381 receiverHelper.stop(); 382 sendThread.join(1000); 383 receiveThread.join(1000); 384 if ( senderHelper.getException() != null ) 385 { 386 fail(senderHelper.getException()); 387 } 388 if ( receiverHelper.getException() != null ) 389 { 390 fail(receiverHelper.getException()); 391 } 392 393 sender.close(); 394 subscriber.close(); 395 session.close(); 396 session2.close(); 397 connection.close(); 398 } 399 catch(Exception e) 400 { 401 fail(e); 402 } 403 } 404 405 406 public static Test suite() 407 { 408 TestSuite suite = new TestSuite() ; 409 410 suite.addTest(new AnonymousProducerTest("testQueueAnonymousSend")); 411 suite.addTest(new AnonymousProducerTest("testTopicAnonymousSend")); 412 413 return suite; 414 } 415 } 416 417 418 438 439 440 | Popular Tags |