KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > test > jbossmq > test > SessionCloseStressTestCase


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.test.jbossmq.test;
23
24 import java.util.Random JavaDoc;
25
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageConsumer JavaDoc;
29 import javax.jms.MessageListener JavaDoc;
30 import javax.jms.MessageProducer JavaDoc;
31 import javax.jms.Queue JavaDoc;
32 import javax.jms.QueueConnection JavaDoc;
33 import javax.jms.QueueConnectionFactory JavaDoc;
34 import javax.jms.QueueSession JavaDoc;
35 import javax.jms.Session JavaDoc;
36 import javax.naming.Context JavaDoc;
37
38 import junit.framework.Test;
39
40 import org.jboss.test.JBossTestCase;
41
42 /**
43  * Tests for receiving while closing the session
44  *
45  * @author <a HREF="mailto:adrian@jboss.org>Adrian Brock</a>
46  * @version <tt>$Revision: 42018 $</tt>
47  */

48 public class SessionCloseStressTestCase extends JBossTestCase
49 {
50    static String JavaDoc QUEUE_FACTORY = "ConnectionFactory";
51    static String JavaDoc QUEUE = "queue/testQueue";
52
53    QueueConnection JavaDoc queueConnection;
54    Queue JavaDoc queue;
55    
56    public SessionCloseStressTestCase(String JavaDoc name) throws Exception JavaDoc
57    {
58       super(name);
59    }
60
61    public abstract class TestRunnable implements Runnable JavaDoc
62    {
63       public Throwable JavaDoc error = null;
64       
65       public abstract void doRun() throws Exception JavaDoc;
66       
67       public void run()
68       {
69          try
70          {
71             doRun();
72          }
73          catch (Throwable JavaDoc t)
74          {
75             log.error("Error in " + Thread.currentThread(), t);
76             error = t;
77          }
78       }
79    }
80    
81    public class SessionRunnable extends TestRunnable
82    {
83       MessageConsumer JavaDoc consumer;
84       
85       int received = 0;
86       
87       public void doRun() throws Exception JavaDoc
88       {
89          QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
90          MessageProducer JavaDoc producer = session.createProducer(queue);
91          for (int i = 0; i < getIterationCount(); ++i)
92          {
93             Message JavaDoc message = session.createTextMessage("" + i);
94             producer.send(message);
95          }
96          producer.close();
97          consumer = session.createConsumer(queue);
98          waitForMessages();
99          session.close();
100       }
101       
102       public synchronized MessageConsumer JavaDoc getConsumer() throws Exception JavaDoc
103       {
104          while (true)
105          {
106             if (consumer != null)
107                return consumer;
108             wait();
109          }
110       }
111       
112       public synchronized void incReceived()
113       {
114          ++received;
115          notifyAll();
116       }
117       
118       public synchronized void waitForMessages() throws Exception JavaDoc
119       {
120          notifyAll();
121          int target = new Random JavaDoc().nextInt(getIterationCount());
122          while (received < target)
123             wait();
124       }
125    }
126    
127    public class ReceiverRunnable extends TestRunnable
128    {
129       SessionRunnable sessionRunnable;
130       
131       public ReceiverRunnable(SessionRunnable sessionRunnable)
132       {
133          this.sessionRunnable = sessionRunnable;
134       }
135       
136       public void doRun() throws Exception JavaDoc
137       {
138          MessageConsumer JavaDoc consumer = sessionRunnable.getConsumer();
139          try
140          {
141             while (true)
142             {
143                consumer.receive();
144                sessionRunnable.incReceived();
145             }
146          }
147          catch (JMSException JavaDoc expected)
148          {
149             if (expected.getMessage().indexOf("closed") == -1)
150                throw expected;
151          }
152       }
153    }
154    
155    public class ReceiverNoWaitRunnable extends TestRunnable
156    {
157       SessionRunnable sessionRunnable;
158       
159       public ReceiverNoWaitRunnable(SessionRunnable sessionRunnable)
160       {
161          this.sessionRunnable = sessionRunnable;
162       }
163       
164       public void doRun() throws Exception JavaDoc
165       {
166          MessageConsumer JavaDoc consumer = sessionRunnable.getConsumer();
167          try
168          {
169             while (true)
170             {
171                if (consumer.receiveNoWait() != null)
172                   sessionRunnable.incReceived();
173             }
174          }
175          catch (JMSException JavaDoc expected)
176          {
177             if (expected.getMessage().indexOf("closed") == -1)
178                throw expected;
179          }
180       }
181    }
182    
183    public class ReceiverMessageListenerRunnable extends TestRunnable implements MessageListener JavaDoc
184    {
185       SessionRunnable sessionRunnable;
186       
187       public ReceiverMessageListenerRunnable(SessionRunnable sessionRunnable)
188       {
189          this.sessionRunnable = sessionRunnable;
190       }
191       
192       public void onMessage(Message JavaDoc message)
193       {
194          sessionRunnable.incReceived();
195       }
196       
197       public void doRun() throws Exception JavaDoc
198       {
199          MessageConsumer JavaDoc consumer = sessionRunnable.getConsumer();
200          try
201          {
202             consumer.setMessageListener(this);
203          }
204          catch (JMSException JavaDoc expected)
205          {
206             if (expected.getMessage().indexOf("closed") == -1)
207                throw expected;
208          }
209       }
210    }
211    
212    public void testSessionCloseCompetesWithReceive() throws Exception JavaDoc
213    {
214       connect();
215       try
216       {
217          for (int i = 0; i < getThreadCount(); ++i)
218          {
219             SessionRunnable sessionRunnable = new SessionRunnable();
220             Thread JavaDoc sessionThread = new Thread JavaDoc(sessionRunnable, "Session");
221             Thread JavaDoc consumerThread = new Thread JavaDoc(new ReceiverRunnable(sessionRunnable), "Consumer");
222             consumerThread.start();
223             sessionThread.start();
224             sessionThread.join();
225             consumerThread.join();
226             assertNull(sessionRunnable.error);
227
228             // Drain the queue
229
QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
230             MessageConsumer JavaDoc consumer = session.createConsumer(queue);
231             while (consumer.receiveNoWait() != null);
232             session.close();
233          }
234       }
235       finally
236       {
237          disconnect();
238       }
239    }
240    
241    public void testSessionCloseCompetesWithReceiveNoWait() throws Exception JavaDoc
242    {
243       connect();
244       try
245       {
246          for (int i = 0; i < getThreadCount(); ++i)
247          {
248             SessionRunnable sessionRunnable = new SessionRunnable();
249             Thread JavaDoc sessionThread = new Thread JavaDoc(sessionRunnable, "Session");
250             Thread JavaDoc consumerThread = new Thread JavaDoc(new ReceiverNoWaitRunnable(sessionRunnable), "Consumer");
251             consumerThread.start();
252             sessionThread.start();
253             sessionThread.join();
254             consumerThread.join();
255             assertNull(sessionRunnable.error);
256
257             // Drain the queue
258
QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
259             MessageConsumer JavaDoc consumer = session.createConsumer(queue);
260             while (consumer.receiveNoWait() != null);
261             session.close();
262          }
263       }
264       finally
265       {
266          disconnect();
267       }
268    }
269    
270    public void testSessionCloseCompetesWithMessageListener() throws Exception JavaDoc
271    {
272       connect();
273       try
274       {
275          for (int i = 0; i < getThreadCount(); ++i)
276          {
277             SessionRunnable sessionRunnable = new SessionRunnable();
278             Thread JavaDoc sessionThread = new Thread JavaDoc(sessionRunnable, "Session");
279             Thread JavaDoc consumerThread = new Thread JavaDoc(new ReceiverMessageListenerRunnable(sessionRunnable), "Consumer");
280             consumerThread.start();
281             sessionThread.start();
282             sessionThread.join();
283             consumerThread.join();
284             assertNull(sessionRunnable.error);
285
286             // Drain the queue
287
QueueSession JavaDoc session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
288             MessageConsumer JavaDoc consumer = session.createConsumer(queue);
289             while (consumer.receiveNoWait() != null);
290             session.close();
291          }
292       }
293       finally
294       {
295          disconnect();
296       }
297    }
298
299    protected void connect() throws Exception JavaDoc
300    {
301       Context JavaDoc context = getInitialContext();
302       queue = (Queue JavaDoc) context.lookup(QUEUE);
303       QueueConnectionFactory JavaDoc queueFactory = (QueueConnectionFactory JavaDoc) context.lookup(QUEUE_FACTORY);
304       queueConnection = queueFactory.createQueueConnection();
305       queueConnection.start();
306
307       getLog().debug("Connection established.");
308    }
309
310    protected void disconnect()
311    {
312       try
313       {
314          if (queueConnection != null)
315             queueConnection.close();
316       }
317       catch (Exception JavaDoc ignored)
318       {
319       }
320
321       getLog().debug("Connection closed.");
322    }
323
324    public static Test suite() throws Exception JavaDoc
325    {
326        ClassLoader JavaDoc loader = Thread.currentThread().getContextClassLoader();
327        return getDeploySetup(SessionCloseStressTestCase.class,
328                loader.getResource("messaging/test-destinations-service.xml").toString());
329    }
330 }
331
Popular Tags