KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > PublishOnQueueConsumedMessageInTransactionTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19
20 import junit.framework.TestCase;
21 import org.apache.activemq.ActiveMQConnectionFactory;
22 import org.apache.activemq.util.IOHelper;
23 import org.apache.activemq.command.ActiveMQQueue;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import javax.jms.*;
28 import java.util.List JavaDoc;
29 import java.util.Collections JavaDoc;
30 import java.util.ArrayList JavaDoc;
31 import java.io.File JavaDoc;
32
33
34 public final class PublishOnQueueConsumedMessageInTransactionTest extends TestCase implements MessageListener {
35
36     private static final Log log = LogFactory.getLog(PublishOnQueueConsumedMessageInTransactionTest.class);
37     
38     private Session producerSession;
39     private Session consumerSession;
40     private Destination queue;
41     private ActiveMQConnectionFactory factory;
42     private MessageProducer producer;
43     private MessageConsumer consumer;
44     private Connection connection;
45     private ObjectMessage objectMessage = null;
46     private List JavaDoc messages = createConcurrentList();
47     private final Object JavaDoc lock = new Object JavaDoc();
48     private String JavaDoc[] data;
49     private String JavaDoc DATAFILE_ROOT = IOHelper.getDefaultDataDirectory();
50     private int messageCount = 3;
51     private String JavaDoc url = "vm://localhost";
52
53     // Invalid acknowledgment warning can be viewed on the console of a remote broker
54
// The warning message is not thrown back to the client
55
//private String url = "tcp://localhost:61616";
56

57
58     protected void setUp() throws Exception JavaDoc {
59         File JavaDoc dataFile = new File JavaDoc(DATAFILE_ROOT);
60         recursiveDelete(dataFile);
61         try {
62             factory = new ActiveMQConnectionFactory(url);
63             connection = factory.createConnection();
64             producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
65             consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
66             queue = new ActiveMQQueue("FOO.BAR");
67             data = new String JavaDoc[messageCount];
68
69             for (int i = 0; i < messageCount; i++) {
70                 data[i] = "Message : " + i;
71             }
72         } catch (JMSException je) {
73             fail("Error setting up connection : " + je.toString());
74         }
75     }
76
77
78     public void testSendReceive() throws Exception JavaDoc {
79         sendMessage();
80
81         connection.start();
82         consumer = consumerSession.createConsumer(queue);
83         consumer.setMessageListener(this);
84         waitForMessagesToBeDelivered();
85         assertEquals("Messages received doesn't equal messages sent", messages.size(),data.length);
86
87     }
88
89
90     protected void sendMessage() throws JMSException {
91         messages.clear();
92         try {
93             for (int i = 0; i < data.length; ++i) {
94                 producer = producerSession.createProducer(queue);
95                 objectMessage = producerSession.createObjectMessage(data[i]);
96                 producer.send(objectMessage);
97                 producerSession.commit();
98                 log.info("sending message :" + objectMessage);
99             }
100         } catch (Exception JavaDoc e) {
101             if (producerSession != null) {
102                 producerSession.rollback();
103                 log.info("rollback");
104                 producerSession.close();
105             }
106
107             e.printStackTrace();
108         }
109     }
110
111
112     public synchronized void onMessage(Message m) {
113         try {
114             objectMessage = (ObjectMessage) m;
115             consumeMessage(objectMessage,messages);
116
117             log.info("consumer received message :" + objectMessage);
118             consumerSession.commit();
119
120         } catch (Exception JavaDoc e) {
121             try {
122                 consumerSession.rollback();
123                 log.info("rolled back transaction");
124             } catch (JMSException e1) {
125                 log.info(e1);
126                 e1.printStackTrace();
127             }
128             log.info(e);
129             e.printStackTrace();
130         }
131     }
132
133
134     protected void consumeMessage(Message message, List JavaDoc messageList) {
135         messageList.add(message);
136         if (messageList.size() >= data.length) {
137             synchronized (lock) {
138                 lock.notifyAll();
139             }
140         }
141
142     }
143
144
145     protected List JavaDoc createConcurrentList() {
146         return Collections.synchronizedList(new ArrayList JavaDoc());
147     }
148
149
150     protected void waitForMessagesToBeDelivered() {
151         long maxWaitTime = 5000;
152         long waitTime = maxWaitTime;
153         long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
154
155         synchronized (lock) {
156             while (messages.size() <= data.length && waitTime >= 0) {
157                 try {
158                     lock.wait(200);
159                 } catch (InterruptedException JavaDoc e) {
160                     e.printStackTrace();
161                 }
162
163                 waitTime = maxWaitTime - (System.currentTimeMillis() - start);
164             }
165         }
166     }
167
168
169     protected static void recursiveDelete(File JavaDoc file) {
170         if( file.isDirectory() ) {
171             File JavaDoc[] files = file.listFiles();
172             for (int i = 0; i < files.length; i++) {
173                 recursiveDelete(files[i]);
174             }
175         }
176         file.delete();
177     }
178
179     protected void tearDown() throws Exception JavaDoc {
180         if (connection != null) {
181             connection.close();
182         }
183
184         super.tearDown();
185     }
186 }
187
Popular Tags