KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > walend > somnifugi > test > FlowLimitTests


1 package net.walend.somnifugi.test;
2
3 import java.util.List JavaDoc;
4 import java.util.ArrayList JavaDoc;
5 import java.util.Hashtable JavaDoc;
6
7 import javax.naming.InitialContext JavaDoc;
8 import javax.naming.Context JavaDoc;
9 import javax.naming.NamingException JavaDoc;
10
11 import javax.jms.QueueConnectionFactory JavaDoc;
12 import javax.jms.QueueConnection JavaDoc;
13 import javax.jms.Queue JavaDoc;
14 import javax.jms.QueueSession JavaDoc;
15 import javax.jms.QueueSender JavaDoc;
16 import javax.jms.Message JavaDoc;
17 import javax.jms.ObjectMessage JavaDoc;
18 import javax.jms.BytesMessage JavaDoc;
19 import javax.jms.QueueReceiver JavaDoc;
20 import javax.jms.Session JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22 import javax.jms.MessageListener JavaDoc;
23 import javax.jms.QueueRequestor JavaDoc;
24 import javax.jms.Topic JavaDoc;
25 import javax.jms.TopicConnection JavaDoc;
26 import javax.jms.TopicConnectionFactory JavaDoc;
27 import javax.jms.TopicPublisher JavaDoc;
28 import javax.jms.TopicSession JavaDoc;
29 import javax.jms.TopicSubscriber JavaDoc;
30
31 import junit.framework.TestSuite;
32 import junit.framework.Test;
33
34 import net.walend.toolkit.junit.TestCase;
35
36 import net.walend.somnifugi.SomniInterruptedException;
37
38 /**
39 Tests to show flow limit working
40
41 @author <a HREF="http://walend.net">David Walend</a> <a HREF="mailto:david@walend.net">david@walend.net</a>
42 @author <a HREF="http://www.jumpi.org">Peter Klauser</a> <a HREF="mailto:klp@users.sourceforge.net">klp@users.sourceforge.net</a>
43
44 */

45
46 public class FlowLimitTests extends TestCase
47 {
48     public FlowLimitTests(String JavaDoc testName)
49     {
50         super(testName);
51     }
52     
53     
54     protected class ObjectSender
55         implements Runnable JavaDoc
56     {
57         private int delay;
58         private QueueSession JavaDoc session;
59         private QueueSender JavaDoc sender;
60         private Exception JavaDoc exception = null;
61         private volatile Thread JavaDoc thread = null;
62
63         public ObjectSender(QueueSession JavaDoc session, QueueSender JavaDoc sender,int delay)
64         {
65             this.sender = sender;
66             this.session = session;
67             this.delay = delay;
68         }
69         
70         public void run()
71         {
72             thread = Thread.currentThread();
73             try
74                 {
75                     int i = 0;
76                     while(!thread.isInterrupted())
77                         {
78                             byte[] bytes = new byte[1000];
79                             if ( i++ % 100 == 0 ) {
80                                 System.out.println("Sent : " + i );
81                             }
82                             Message JavaDoc message = session.createObjectMessage(bytes);
83                             sender.send(message);
84                             if ( delay > 0 ) {
85                                 Thread.currentThread().sleep(delay);
86                             }
87                         }
88                 }
89             catch(InterruptedException JavaDoc ie)
90                 {
91                     //expected
92
}
93             catch(SomniInterruptedException ie)
94                 {
95                     //expected
96
}
97             catch(Exception JavaDoc e)
98                 {
99                     exception = e;
100                 }
101         }
102         
103         public void stop() {
104             thread.interrupt();
105         }
106         
107         public Exception JavaDoc getException() {
108             return exception;
109         }
110     }
111     
112     protected class ObjectPublisher
113         implements Runnable JavaDoc
114     {
115         private int delay;
116         private TopicSession JavaDoc session;
117         private TopicPublisher JavaDoc sender;
118         private Exception JavaDoc exception = null;
119         private volatile Thread JavaDoc thread = null;
120         
121         public ObjectPublisher(TopicSession JavaDoc session, TopicPublisher JavaDoc sender,int delay)
122         {
123             this.sender = sender;
124             this.session = session;
125             this.delay = delay;
126         }
127         
128         public void run()
129         {
130             thread = Thread.currentThread();
131             try
132                 {
133                     int i = 0;
134                     while(!thread.isInterrupted())
135                         {
136                             byte[] bytes = new byte[1000];
137                             if ( i++ % 100 == 0 ) {
138                                 System.out.println("Sent : " + i );
139                             }
140                             Message JavaDoc message = session.createObjectMessage(bytes);
141                             sender.publish(message);
142                             if ( delay > 0 ) {
143                                 Thread.currentThread().sleep(delay);
144                             }
145                         }
146                 }
147             catch(InterruptedException JavaDoc ie)
148                 {
149                     //expected
150
}
151             catch(SomniInterruptedException ie)
152                 {
153                     //expected
154
}
155             catch(Exception JavaDoc e)
156                 {
157                     exception = e;
158                 }
159         }
160         
161         public void stop() {
162             thread.interrupt();
163         }
164         
165         public Exception JavaDoc getException() {
166             return exception;
167         }
168     }
169     
170     protected class ObjectReceiver
171         implements Runnable JavaDoc
172     {
173         private QueueSession JavaDoc session;
174         private QueueReceiver JavaDoc receiver;
175         private long delay;
176         private Exception JavaDoc exception = null;
177         private volatile Thread JavaDoc thread = null;
178         
179         public ObjectReceiver(QueueSession JavaDoc session,QueueReceiver JavaDoc receiver,long delay)
180         {
181             this.session = session;
182             this.receiver = receiver;
183             this.delay = delay;
184         }
185         
186         public void run()
187         {
188             thread = Thread.currentThread();
189             try
190                 {
191                     int i = 0;
192                     while(!thread.isInterrupted()){
193                         ObjectMessage JavaDoc message = (ObjectMessage JavaDoc)receiver.receive(100);
194                         if ( message == null ) {
195                             System.out.println("Receive timeout");
196                         } else {
197                             if ( i++ % 100 == 0 ) {
198                                 System.out.println("Received : " + i );
199                             }
200                             if ( delay > 0 ) {
201                                 Thread.currentThread().sleep(delay);
202                             }
203                         }
204                     }
205                 }
206             catch(InterruptedException JavaDoc ie)
207                 {
208                     //expected
209
}
210             catch(SomniInterruptedException ie)
211                 {
212                     //expected
213
}
214             catch(Exception JavaDoc e)
215                 {
216                     exception = e;
217                 }
218         }
219         
220         public void stop() {
221             thread.interrupt();
222         }
223         
224         public Exception JavaDoc getException() {
225             return exception;
226         }
227     }
228     
229     protected class ObjectSubscriber
230         implements Runnable JavaDoc
231     {
232         private TopicSession JavaDoc session;
233         private TopicSubscriber JavaDoc receiver;
234         private long delay;
235         private Exception JavaDoc exception = null;
236         private volatile Thread JavaDoc thread = null;
237         
238         public ObjectSubscriber(TopicSession JavaDoc session,TopicSubscriber JavaDoc receiver,long delay)
239         {
240             this.session = session;
241             this.receiver = receiver;
242             this.delay = delay;
243         }
244         
245         public void run()
246         {
247             thread = Thread.currentThread();
248             try
249                 {
250                     int i = 0;
251                     while(!thread.isInterrupted()){
252                         ObjectMessage JavaDoc message = (ObjectMessage JavaDoc)receiver.receive(100);
253                         if ( message == null ) {
254                             System.out.println("Receive timeout");
255                         } else {
256                             if ( i++ % 100 == 0 ) {
257                                 System.out.println("Received : " + i );
258                             }
259                             if ( delay > 0 ) {
260                                 Thread.currentThread().sleep(delay);
261                             }
262                         }
263                     }
264                 }
265             catch(InterruptedException JavaDoc ie)
266                 {
267                     //expected
268
}
269             catch(SomniInterruptedException ie)
270                 {
271                     //expected
272
}
273             catch(Exception JavaDoc e)
274                 {
275                     exception = e;
276                 }
277         }
278         
279         public void stop() {
280             thread.interrupt();
281         }
282         
283         public Exception JavaDoc getException() {
284             return exception;
285         }
286     }
287     
288     protected class TestMessageListener
289         implements MessageListener JavaDoc
290     {
291         int i = 0;
292         long delay;
293         
294         protected TestMessageListener(long delay)
295         {
296             this.delay = delay;
297         }
298         
299         public void onMessage(Message JavaDoc message)
300         {
301             ObjectMessage JavaDoc om = (ObjectMessage JavaDoc)message;
302             
303             if ( i++ % 100 == 0 ) {
304                 System.out.println("OnMessage : " + i );
305             }
306             try
307                 {
308                     Thread.sleep(delay);
309                 }
310             catch(InterruptedException JavaDoc ie)
311                 {
312                     Thread.currentThread().interrupt();
313                 }
314         }
315     }
316     
317     
318     public void testOverflow()
319     {
320         try
321             {
322                 //set things up in JNDI
323
Hashtable JavaDoc<String JavaDoc,String JavaDoc> env = new Hashtable JavaDoc<String JavaDoc,String JavaDoc>(11);
324                 
325                 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory");
326                 env.put(Context.PROVIDER_URL,"<not-used>");
327                 /* when the capacity is not set, then no limit is set on the entries
328                  * making it possible to cause an out-of-memory exception
329                 env.put("queue.capacity", "100");
330                  env.put("topic.capacity", "100");
331                  env.put("queue.blockinginterval","1000");
332                  env.put("topic.blockinginterval","1000");
333                 */

334                 
335                 // Create the initial context
336
Context JavaDoc ctx = new InitialContext JavaDoc(env);
337                 QueueConnection JavaDoc connection = (QueueConnection JavaDoc)ctx.lookup("Connection");
338                 connection.start();
339                 Queue JavaDoc queue = (Queue JavaDoc)ctx.lookup("testOverflowQueue");
340                 
341                 QueueSession JavaDoc session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
342                 QueueSender JavaDoc sender = session.createSender(queue);
343                 
344                 ObjectSender senderHelper = new ObjectSender(session, sender,0);
345                 Thread JavaDoc sendThread = new Thread JavaDoc(senderHelper);
346                 sendThread.start();
347                 
348                 Thread.currentThread().sleep(30000);
349                 senderHelper.stop();
350                 sendThread.join(1000);
351                 if ( senderHelper.getException() != null ) {
352                     fail(senderHelper.getException());
353                 }
354                 
355                 sender.close();
356                 session.close();
357                 connection.close();
358             }
359         catch(Exception JavaDoc e)
360             {
361                 fail(e);
362             }
363     }
364     
365     public void testQueueFlowlimit()
366     {
367         try
368             {
369                 //set things up in JNDI
370
Hashtable JavaDoc<String JavaDoc,String JavaDoc> env = new Hashtable JavaDoc<String JavaDoc,String JavaDoc>(11);
371                 
372                 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory");
373                 env.put(Context.PROVIDER_URL,"<not-used>");
374                 env.put("testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory");
375                 env.put("testFlowLimitQueue.timeout", "1000");
376                 env.put("wrapped-testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory");
377                 env.put("testFlowLimitQueue.capacity", "100");
378
379                 // Create the initial context
380
Context JavaDoc ctx = new InitialContext JavaDoc(env);
381                 QueueConnection JavaDoc connection = (QueueConnection JavaDoc)ctx.lookup("Connection");
382                 connection.start();
383                 Queue JavaDoc queue = (Queue JavaDoc)ctx.lookup("testFlowLimitQueue");
384                 
385                 QueueSession JavaDoc session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
386                 QueueSender JavaDoc sender = session.createSender(queue);
387                 
388                 QueueSession JavaDoc session2 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
389                 QueueReceiver JavaDoc receiver = session2.createReceiver(queue);
390                 
391                 ObjectSender senderHelper = new ObjectSender(session, sender,0);
392                 Thread JavaDoc sendThread = new Thread JavaDoc(senderHelper);
393                 sendThread.start();
394                 
395                 ObjectReceiver receiverHelper = new ObjectReceiver(session2,receiver,100);
396                 Thread JavaDoc receiveThread = new Thread JavaDoc(receiverHelper);
397                 receiveThread.start();
398                 
399                 Thread.currentThread().sleep(30000);
400                 senderHelper.stop();
401                 receiverHelper.stop();
402                 sendThread.join(1000);
403                 receiveThread.join(1000);
404                 if ( senderHelper.getException() != null ) {
405                     fail(senderHelper.getException());
406                 }
407                 if ( receiverHelper.getException() != null ) {
408                     fail(receiverHelper.getException());
409                 }
410                 
411                 sender.close();
412                 receiver.close();
413                 session.close();
414                 session2.close();
415                 connection.close();
416             }
417         catch(Exception JavaDoc e)
418             {
419                 fail(e);
420             }
421     }
422     
423     
424     public void testQueueFlowlimitMessageListener()
425     {
426         try
427             {
428                 //set things up in JNDI
429
Hashtable JavaDoc<String JavaDoc,String JavaDoc> env = new Hashtable JavaDoc<String JavaDoc,String JavaDoc>(11);
430                 
431                 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniQueueContextFactory");
432                 env.put(Context.PROVIDER_URL,"<not-used>");
433                 env.put("testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory");
434                 env.put("testFlowLimitQueue.timeout", "1000");
435                 env.put("wrapped-testFlowLimitQueue.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory");
436                 env.put("testFlowLimitQueue.capacity", "100");
437                 
438                 // Create the initial context
439
Context JavaDoc ctx = new InitialContext JavaDoc(env);
440                 QueueConnection JavaDoc connection = (QueueConnection JavaDoc)ctx.lookup("Connection");
441                 connection.start();
442                 Queue JavaDoc queue = (Queue JavaDoc)ctx.lookup("testFlowLimitQueue");
443                 
444                 QueueSession JavaDoc session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
445                 QueueSender JavaDoc sender = session.createSender(queue);
446                 
447                 TestMessageListener messageListener = new TestMessageListener(100);
448                 QueueSession JavaDoc session2 = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
449                 QueueReceiver JavaDoc receiver = session2.createReceiver(queue);
450                 receiver.setMessageListener(messageListener);
451                 
452                 ObjectSender senderHelper = new ObjectSender(session, sender,0);
453                 Thread JavaDoc sendThread = new Thread JavaDoc(senderHelper);
454                 sendThread.start();
455                 
456                 Thread.currentThread().sleep(30000);
457                 senderHelper.stop();
458                 sendThread.join(1000);
459                 if ( senderHelper.getException() != null ) {
460                     fail(senderHelper.getException());
461                 }
462                 
463                 sender.close();
464                 //#pklauser says this is a bug - close deadlock caused -
465
receiver.close();
466                 session.close();
467                 session2.close();
468                 connection.close();
469             }
470         catch(Exception JavaDoc e)
471             {
472                 fail(e);
473             }
474     }
475     
476     public void testTopicFlowlimit()
477     {
478         try
479             {
480                 //set things up in JNDI
481
Hashtable JavaDoc<String JavaDoc,String JavaDoc> env = new Hashtable JavaDoc<String JavaDoc,String JavaDoc>(11);
482                 
483                 env.put(Context.INITIAL_CONTEXT_FACTORY,"net.walend.somnifugi.SomniTopicContextFactory");
484                 env.put(Context.PROVIDER_URL,"<not-used>");
485                 env.put("testFlowLimitTopic.ChannelFactoryClassName", "net.walend.somnifugi.TimeoutChannelFactory");
486                 env.put("testFlowLimitTopic.timeout", "1000");
487                 env.put("wrapped-testFlowLimitTopic.ChannelFactoryClassName", "net.walend.somnifugi.juc.SimpleChannelFactory");
488                 env.put("testFlowLimitTopic.capacity", "100");
489
490                 // Create the initial context
491
Context JavaDoc ctx = new InitialContext JavaDoc(env);
492                 TopicConnection JavaDoc connection = (TopicConnection JavaDoc)ctx.lookup("Connection");
493                 connection.start();
494                 Topic JavaDoc topic = (Topic JavaDoc)ctx.lookup("testFlowLimitTopic");
495                 
496                 TopicSession JavaDoc session = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
497                 TopicPublisher JavaDoc sender = session.createPublisher(topic);
498                 
499                 TopicSession JavaDoc session2 = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
500                 TopicSubscriber JavaDoc subscriber = session2.createSubscriber(topic);
501                 
502                 ObjectPublisher senderHelper = new ObjectPublisher(session, sender,0);
503                 Thread JavaDoc sendThread = new Thread JavaDoc(senderHelper);
504                 sendThread.start();
505                 
506                 ObjectSubscriber receiverHelper = new ObjectSubscriber(session2,subscriber,100);
507                 Thread JavaDoc receiveThread = new Thread JavaDoc(receiverHelper);
508                 receiveThread.start();
509                 
510                 Thread.currentThread().sleep(30000);
511                 senderHelper.stop();
512                 receiverHelper.stop();
513                 sendThread.join(1000);
514                 receiveThread.join(1000);
515                 if ( senderHelper.getException() != null ) {
516                     fail(senderHelper.getException());
517                 }
518                 if ( receiverHelper.getException() != null ) {
519                     fail(receiverHelper.getException());
520                 }
521                 
522                 sender.close();
523                 //#pklauser says this is a bug - deadlock -
524
subscriber.close();
525                 session.close();
526                 session2.close();
527                 connection.close();
528             }
529         catch(Exception JavaDoc e)
530             {
531                 fail(e);
532             }
533     }
534     
535     
536     public static Test suite()
537     {
538         TestSuite suite = new TestSuite() ;
539         
540         //to show memory
541
//keep this turned off -- it just demonstrates the problem suite.addTest(new FlowLimitTests("testOverflow"));
542

543         //to show flow limit in action
544
suite.addTest(new FlowLimitTests("testQueueFlowlimit"));
545         suite.addTest(new FlowLimitTests("testTopicFlowlimit"));
546         suite.addTest(new FlowLimitTests("testQueueFlowlimitMessageListener"));
547         
548         return suite;
549     }
550 }
551
552
553 /*Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 David Walend
554 All rights reserved.
555
556 Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
557
558 Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
559
560 Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
561
562 Neither the name of the SomnifugiJMS Project, walend.net, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission from David Walend.
563
564 Credits in redistributions in source or binary forms must include a link to http://somnifugi.sourceforge.net .
565
566 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
567 The net.walend.somnifugi.sql92 package is modified code from the openmq project, https://mq.dev.java.net/ , Copyright (c) of Sun, and carries the CDDL license, repeated here: You can obtain a copy of the license at https://glassfish.dev.java.net/public/CDDLv1.0.html. See the License for the specific language governing permissions and limitations under the License.
568
569 =================================================================================
570
571 For more information and the latest version of this software, please see http://somnifugi.sourceforge.net and http://walend.net or email <a HREF="mailto:david@walend.net">david@walend.net</a>.
572 */

573
574
575
Popular Tags