KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmq > test > ReceiversImplStressTest


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.test.jbossmq.test;
23
24 import javax.jms.MessageConsumer JavaDoc;
25 import javax.jms.MessageProducer JavaDoc;
26 import javax.jms.ObjectMessage JavaDoc;
27 import javax.jms.Queue JavaDoc;
28 import javax.jms.QueueConnection JavaDoc;
29 import javax.jms.QueueConnectionFactory JavaDoc;
30 import javax.jms.QueueSession JavaDoc;
31 import javax.jms.Session JavaDoc;
32 import javax.naming.Context JavaDoc;
33
34 import org.jboss.test.JBossTestCase;
35
36 /**
37  * Tests for receivers impl
38  *
39  * @author <a HREF="mailto:adrian@jboss.org>Adrian Brock</a>
40  * @version <tt>$Revision: 41758 $</tt>
41  */

42 public abstract class ReceiversImplStressTest extends JBossTestCase
43 {
44    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
45    static String JavaDoc QUEUE = "queue/ReceiversImpl";
46    static int messages = 0;
47    
48    QueueConnection JavaDoc queueConnection;
49    Queue JavaDoc queue;
50    
51    public ReceiversImplStressTest(String JavaDoc name) throws Exception JavaDoc
52    {
53       super(name);
54       messages = getThreadCount() * getBeanCount();
55    }
56
57    public abstract class TestRunnable implements Runnable JavaDoc
58    {
59       public Throwable JavaDoc error = null;
60       
61       public abstract void doRun() throws Exception JavaDoc;
62       
63       public void run()
64       {
65          try
66          {
67             doRun();
68          }
69          catch (Throwable JavaDoc t)
70          {
71             log.error("Error in " + Thread.currentThread(), t);
72             error = t;
73          }
74       }
75    }
76
77    public class ReceiverRunnable extends TestRunnable
78    {
79       int integer;
80       
81       MessageConsumer JavaDoc consumer;
82       
83       public ReceiverRunnable(int integer) throws Exception JavaDoc
84       {
85          this.integer = integer;
86          QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
87          consumer = session.createConsumer(queue);
88       }
89       
90       public void doRun() throws Exception JavaDoc
91       {
92          int count = getBeanCount();
93          while (count > 0)
94          {
95             consumer.receive();
96             --count;
97          }
98       }
99    }
100
101    public class ReceiverNoWaitRunnable extends TestRunnable
102    {
103       int integer;
104       
105       MessageConsumer JavaDoc consumer;
106       
107       public ReceiverNoWaitRunnable(int integer) throws Exception JavaDoc
108       {
109          this.integer = integer;
110          QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
111          consumer = session.createConsumer(queue);
112       }
113       
114       public void doRun() throws Exception JavaDoc
115       {
116          int count = getBeanCount();
117          while (count > 0)
118          {
119             if (consumer.receiveNoWait() != null)
120                --count;
121          }
122       }
123    }
124    
125    public void testReceiversImplReceive() throws Exception JavaDoc
126    {
127       connect();
128       try
129       {
130          ReceiverRunnable[] receivers = new ReceiverRunnable[getThreadCount()];
131          Thread JavaDoc[] consumerThreads = new Thread JavaDoc[getThreadCount()];
132          for (int i = 0; i < consumerThreads.length; ++i)
133          {
134             receivers[i] = new ReceiverRunnable(i);
135             consumerThreads[i] = new Thread JavaDoc(receivers[i], "Consumer" + i);
136          }
137          for (int i = 0; i < consumerThreads.length; ++i)
138             consumerThreads[i].start();
139
140          QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
141          MessageProducer JavaDoc producer = session.createProducer(queue);
142          for (int i = 0; i < messages; ++i)
143          {
144             ObjectMessage JavaDoc message = session.createObjectMessage(new Integer JavaDoc(i));
145             producer.send(message);
146          }
147
148          
149          for (int i = 0; i < consumerThreads.length; ++i)
150             consumerThreads[i].join();
151          for (int i = 0; i < consumerThreads.length; ++i)
152             assertNull(receivers[i].error);
153
154          // Drain the queue
155
MessageConsumer JavaDoc consumer = session.createConsumer(queue);
156          while (consumer.receiveNoWait() != null);
157       }
158       finally
159       {
160          disconnect();
161       }
162    }
163    
164    public void testReceiversImplReceiveNoWait() throws Exception JavaDoc
165    {
166       connect();
167       try
168       {
169          ReceiverNoWaitRunnable[] receivers = new ReceiverNoWaitRunnable[getThreadCount()];
170          Thread JavaDoc[] consumerThreads = new Thread JavaDoc[getThreadCount()];
171          for (int i = 0; i < consumerThreads.length; ++i)
172          {
173             receivers[i] = new ReceiverNoWaitRunnable(i);
174             consumerThreads[i] = new Thread JavaDoc(receivers[i], "Consumer" + i);
175          }
176          for (int i = 0; i < consumerThreads.length; ++i)
177             consumerThreads[i].start();
178
179          QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
180          MessageProducer JavaDoc producer = session.createProducer(queue);
181          for (int i = 0; i < messages; ++i)
182          {
183             ObjectMessage JavaDoc message = session.createObjectMessage(new Integer JavaDoc(i));
184             producer.send(message);
185          }
186
187          
188          for (int i = 0; i < consumerThreads.length; ++i)
189             consumerThreads[i].join();
190          for (int i = 0; i < consumerThreads.length; ++i)
191             assertNull(receivers[i].error);
192
193          // Drain the queue
194
MessageConsumer JavaDoc consumer = session.createConsumer(queue);
195          while (consumer.receiveNoWait() != null);
196       }
197       finally
198       {
199          disconnect();
200       }
201    }
202
203    protected void connect() throws Exception JavaDoc
204    {
205       Context JavaDoc context = getInitialContext();
206       queue = (Queue JavaDoc) context.lookup(QUEUE);
207       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
208       queueConnection = queueFactory.createQueueConnection();
209       queueConnection.start();
210
211       getLog().debug("Connection established.");
212    }
213
214    protected void disconnect()
215    {
216       try
217       {
218          if (queueConnection != null)
219             queueConnection.close();
220       }
221       catch (Exception JavaDoc ignored)
222       {
223       }
224
225       getLog().debug("Connection closed.");
226    }
227 }
228
Popular Tags