1 22 package org.jboss.test.jbossmq.test; 23 24 import java.util.Enumeration ; 25 import java.util.List ; 26 27 import javax.jms.DeliveryMode ; 28 import javax.jms.Queue ; 29 import javax.jms.QueueBrowser ; 30 import javax.jms.QueueReceiver ; 31 import javax.jms.QueueSender ; 32 import javax.jms.QueueSession ; 33 import javax.jms.Session ; 34 import javax.jms.TextMessage ; 35 import javax.management.Attribute ; 36 import javax.management.ObjectName ; 37 38 import junit.framework.Test; 39 import junit.framework.TestSuite; 40 41 import org.jboss.test.JBossTestSetup; 42 import org.jboss.util.collection.CollectionsUtil; 43 44 50 public class ExpiryDestinationTestCase 51 extends JBossMQUnitTest 52 { 53 54 static String DLQ_QUEUE = "queue/DLQ"; 55 int times; 56 ObjectName dlq; 57 ObjectName tq; 58 59 public ExpiryDestinationTestCase(String name) throws Exception 60 { 61 super(name); 62 getLog().debug(getName()); 63 } 64 65 private List list(QueueSession session, Queue queue) throws Exception 66 { 67 QueueBrowser browser = session.createBrowser( queue ); 68 Enumeration e = browser.getEnumeration(); 69 return CollectionsUtil.list(e); 70 } 71 72 private int size(QueueSession session, Queue queue) throws Exception 73 { 74 List l = list(session, queue); 75 return l.size(); 76 } 77 78 private void assertSize(QueueSession session, Queue queue, int size) throws Exception 79 { 80 assertEquals(size, size(session, queue)); 81 } 82 83 protected void setUp() throws Exception 84 { 85 super.setUp(); 86 times = getIterationCount(); 87 dlq = new ObjectName ("jboss.mq.destination:service=Queue,name=DLQ"); 88 tq = new ObjectName ("jboss.mq.destination:service=Queue,name=testQueue"); 89 getServer().invoke(dlq, "removeAllMessages", null, null); 90 getServer().invoke(tq, "removeAllMessages", null, null); 91 getServer().setAttribute(tq, new Attribute ("ExpiryDestination", dlq)); 92 getServer().invoke(tq, "stop", null, null); 93 getServer().invoke(tq, "start", null, null); 94 95 connect(); 96 queueConnection.start(); 97 drainQueue(); 98 } 99 100 protected void tearDown() throws Exception 101 { 102 super.tearDown(); 103 getServer().invoke(dlq, "removeAllMessages", null, null); 104 getServer().invoke(tq, "removeAllMessages", null, null); 105 getServer().setAttribute(tq, new Attribute ("ExpiryDestination", null)); 106 } 107 108 111 public void testExpiredMessagesMove() throws Exception 112 { 113 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 114 Queue queue = (Queue )context.lookup(TEST_QUEUE); 115 QueueSender sender = session.createSender(queue); 116 117 Queue queue2 = (Queue )context.lookup(DLQ_QUEUE); 118 session.createBrowser( queue ); 119 120 long now = System.currentTimeMillis(); 121 122 TextMessage message = session.createTextMessage(); 123 message.setStringProperty("foo", "bar"); 124 String text = "expire on server"; 125 message.setText(text); 126 sender.send(message, DeliveryMode.PERSISTENT, 4, 1); 127 128 Thread.sleep(1000); 129 130 assertSize(session, queue, 0); 131 assertSize(session, queue2, 1); 132 133 QueueReceiver receiver = session.createReceiver(queue2); 134 message = (TextMessage ) receiver.receiveNoWait(); 135 assertEquals("QUEUE.testQueue", message.getStringProperty("JBOSS_ORIG_DESTINATION")); 136 assertTrue(message.getLongProperty("JBOSS_ORIG_EXPIRATION") > now); 137 assertEquals(0L, message.getJMSExpiration()); 138 assertEquals(text, message.getText()); 139 assertEquals("bar", message.getStringProperty("foo")); 140 receiver.close(); 141 session.close(); 142 143 disconnect(); 144 getLog().debug("passed"); 145 } 146 147 150 public void testSomeLoad() throws Exception 151 { 152 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 153 Queue queue = (Queue )context.lookup(TEST_QUEUE); 154 Queue queue2 = (Queue )context.lookup(DLQ_QUEUE); 155 QueueSender sender = session.createSender(queue); 156 TextMessage message = session.createTextMessage(); 157 158 for (int i = 0; i < times; i++) 159 sender.send(message, DeliveryMode.PERSISTENT, 4, 1); 160 161 for (int tries = 0; tries < 60; tries++) { 162 Thread.sleep(1000); 163 if (size(session, queue) == 0 && 164 size(session, queue2) == times) 165 break; 166 } 167 assertSize(session, queue, 0); 168 assertSize(session, queue2, times); 169 170 getLog().debug("test case where expired messages fail to move"); 171 for (int i = 0; i < times; i++) { 172 if (i % 10 == 0) 173 getServer().invoke(dlq, "stop", null, null); 174 if (i % 10 == 5) 175 getServer().invoke(dlq, "start", null, null); 176 sender.send(message, DeliveryMode.PERSISTENT, 4, 1); 177 } 178 179 Thread.sleep(1000); 180 181 getServer().invoke(dlq, "start", null, null); 182 getServer().invoke(tq, "stop", null, null); 183 getServer().invoke(tq, "start", null, null); 184 185 for (int tries = 0; tries < 60; tries++) { 186 Thread.sleep(1000); 187 if (size(session, queue) == 0 && 188 size(session, queue2) == times * 2) 189 break; 190 } 191 assertSize(session, queue, 0); 192 assertSize(session, queue2, times * 2); 193 } 194 195 public static Test suite() throws Exception 196 { 197 TestSuite suite = new TestSuite(); 198 suite.addTest(new JBossTestSetup(new ExpiryDestinationTestCase("testExpiredMessagesMove"))); 199 suite.addTest(new JBossTestSetup(new ExpiryDestinationTestCase("testSomeLoad"))); 200 return suite; 201 } 202 } 203 | Popular Tags |