1 18 package org.apache.activemq.usecases; 19 20 import junit.framework.TestCase; 21 import org.apache.activemq.ActiveMQConnectionFactory; 22 import org.apache.activemq.util.IOHelper; 23 import org.apache.activemq.command.ActiveMQQueue; 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 27 import javax.jms.*; 28 import java.util.List ; 29 import java.util.Collections ; 30 import java.util.ArrayList ; 31 import java.io.File ; 32 33 34 public final class PublishOnQueueConsumedMessageInTransactionTest extends TestCase implements MessageListener { 35 36 private static final Log log = LogFactory.getLog(PublishOnQueueConsumedMessageInTransactionTest.class); 37 38 private Session producerSession; 39 private Session consumerSession; 40 private Destination queue; 41 private ActiveMQConnectionFactory factory; 42 private MessageProducer producer; 43 private MessageConsumer consumer; 44 private Connection connection; 45 private ObjectMessage objectMessage = null; 46 private List messages = createConcurrentList(); 47 private final Object lock = new Object (); 48 private String [] data; 49 private String DATAFILE_ROOT = IOHelper.getDefaultDataDirectory(); 50 private int messageCount = 3; 51 private String url = "vm://localhost"; 52 53 57 58 protected void setUp() throws Exception { 59 File dataFile = new File (DATAFILE_ROOT); 60 recursiveDelete(dataFile); 61 try { 62 factory = new ActiveMQConnectionFactory(url); 63 connection = factory.createConnection(); 64 producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); 65 consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); 66 queue = new ActiveMQQueue("FOO.BAR"); 67 data = new String [messageCount]; 68 69 for (int i = 0; i < messageCount; i++) { 70 data[i] = "Message : " + i; 71 } 72 } catch (JMSException je) { 73 fail("Error setting up connection : " + je.toString()); 74 } 75 } 76 77 78 public void testSendReceive() throws Exception { 79 sendMessage(); 80 81 connection.start(); 82 consumer = consumerSession.createConsumer(queue); 83 consumer.setMessageListener(this); 84 waitForMessagesToBeDelivered(); 85 assertEquals("Messages received doesn't equal messages sent", messages.size(),data.length); 86 87 } 88 89 90 protected void sendMessage() throws JMSException { 91 messages.clear(); 92 try { 93 for (int i = 0; i < data.length; ++i) { 94 producer = producerSession.createProducer(queue); 95 objectMessage = producerSession.createObjectMessage(data[i]); 96 producer.send(objectMessage); 97 producerSession.commit(); 98 log.info("sending message :" + objectMessage); 99 } 100 } catch (Exception e) { 101 if (producerSession != null) { 102 producerSession.rollback(); 103 log.info("rollback"); 104 producerSession.close(); 105 } 106 107 e.printStackTrace(); 108 } 109 } 110 111 112 public synchronized void onMessage(Message m) { 113 try { 114 objectMessage = (ObjectMessage) m; 115 consumeMessage(objectMessage,messages); 116 117 log.info("consumer received message :" + objectMessage); 118 consumerSession.commit(); 119 120 } catch (Exception e) { 121 try { 122 consumerSession.rollback(); 123 log.info("rolled back transaction"); 124 } catch (JMSException e1) { 125 log.info(e1); 126 e1.printStackTrace(); 127 } 128 log.info(e); 129 e.printStackTrace(); 130 } 131 } 132 133 134 protected void consumeMessage(Message message, List messageList) { 135 messageList.add(message); 136 if (messageList.size() >= data.length) { 137 synchronized (lock) { 138 lock.notifyAll(); 139 } 140 } 141 142 } 143 144 145 protected List createConcurrentList() { 146 return Collections.synchronizedList(new ArrayList ()); 147 } 148 149 150 protected void waitForMessagesToBeDelivered() { 151 long maxWaitTime = 5000; 152 long waitTime = maxWaitTime; 153 long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); 154 155 synchronized (lock) { 156 while (messages.size() <= data.length && waitTime >= 0) { 157 try { 158 lock.wait(200); 159 } catch (InterruptedException e) { 160 e.printStackTrace(); 161 } 162 163 waitTime = maxWaitTime - (System.currentTimeMillis() - start); 164 } 165 } 166 } 167 168 169 protected static void recursiveDelete(File file) { 170 if( file.isDirectory() ) { 171 File [] files = file.listFiles(); 172 for (int i = 0; i < files.length; i++) { 173 recursiveDelete(files[i]); 174 } 175 } 176 file.delete(); 177 } 178 179 protected void tearDown() throws Exception { 180 if (connection != null) { 181 connection.close(); 182 } 183 184 super.tearDown(); 185 } 186 } 187 | Popular Tags |