1 18 19 package org.apache.activemq.usecases; 20 import java.text.DateFormat ; 21 import java.text.SimpleDateFormat ; 22 import java.util.Date ; 23 import java.util.HashMap ; 24 import java.util.Map ; 25 26 import javax.jms.Connection ; 27 import javax.jms.DeliveryMode ; 28 import javax.jms.Destination ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageConsumer ; 32 import javax.jms.MessageListener ; 33 import javax.jms.MessageProducer ; 34 import javax.jms.Session ; 35 import javax.jms.TextMessage ; 36 37 import junit.framework.TestCase; 38 39 import org.apache.activemq.ActiveMQConnectionFactory; 40 import org.apache.commons.logging.Log; 41 import org.apache.commons.logging.LogFactory; 42 43 public class QueueDuplicatesTest extends TestCase { 44 45 private static final Log log = LogFactory.getLog(QueueDuplicatesTest.class); 46 47 private static DateFormat formatter = new SimpleDateFormat ("HH:mm:ss SSS"); 48 private String brokerUrl; 49 private String subject; 50 private Connection brokerConnection; 51 52 public QueueDuplicatesTest(String name) { 53 super(name); 54 } 55 56 protected void setUp() throws Exception { 57 String peerUrl = "peer://localhost:6099"; 58 59 subject = this.getClass().getName(); 60 61 ActiveMQConnectionFactory fac = createFactory(peerUrl); 62 brokerConnection = fac.createConnection(); 63 brokerConnection.start(); 64 } 65 66 protected void tearDown() throws Exception { 67 if (brokerConnection != null) { 68 brokerConnection.close(); 69 } 70 } 71 72 public void testDuplicates() { 73 try { 74 Session session = createSession(brokerConnection); 76 Destination dest = session.createQueue(subject); 78 MessageConsumer consumer = session.createConsumer(dest); 79 consumer.setMessageListener(new SimpleConsumer()); 81 Thread sendingThread = new SendingThread(brokerUrl, subject); 83 sendingThread.start(); 85 Thread.sleep(5000); 87 consumer.close(); 89 Thread.sleep(5000); 91 consumer = session.createConsumer(dest); 93 consumer.setMessageListener(new SimpleConsumer()); 95 Thread.sleep(15000); 97 session.close(); 98 } 99 catch (Exception e) { 100 e.printStackTrace(); 101 } 102 } 103 104 Session createSession(Connection peerConnection) throws JMSException { 105 Session session = peerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 107 return session; 108 } 109 110 private ActiveMQConnectionFactory createFactory(String brokerUrl) { 111 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); 112 cf.setBrokerURL(brokerUrl); 113 114 return cf; 115 } 116 private class SendingThread extends Thread { 117 private String brokerUrl; 118 private String subject; 119 120 SendingThread(String brokerUrl, String subject) { 121 this.brokerUrl = brokerUrl; 122 this.subject = subject; 123 setDaemon(false); 124 } 125 126 public void run() { 127 try { 128 Session session = createSession(brokerConnection); 129 Destination dest = session.createQueue(subject); 130 MessageProducer producer = session.createProducer(dest); 131 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 132 for (int i = 0;i < 20;i++) { 133 String txt = "Text Message: " + i; 134 TextMessage msg = session.createTextMessage(txt); 135 producer.send(msg); 136 log.info(formatter.format(new Date ()) + " Sent ==> " + msg + " to " + subject); 137 Thread.sleep(1000); 138 } 139 session.close(); 140 } 141 catch (Exception e) { 142 e.printStackTrace(); 143 } 144 } 145 } 146 private static class SimpleConsumer implements MessageListener { 147 private Map msgs = new HashMap (); 148 149 public void onMessage(Message message) { 150 log.info(formatter.format(new Date ()) + " SimpleConsumer Message Received: " + message); 151 try { 152 String id = message.getJMSMessageID(); 153 assertNull("Message is duplicate: " + id, msgs.get(id)); 154 msgs.put(id, message); 155 } 156 catch (Exception e) { 157 e.printStackTrace(); 158 } 159 } 160 } 161 } 162 | Popular Tags |