1 18 package org.apache.activemq.broker.advisory; 19 20 import junit.framework.Test; 21 22 import org.apache.activemq.advisory.AdvisorySupport; 23 import org.apache.activemq.broker.BrokerTestSupport; 24 import org.apache.activemq.broker.StubConnection; 25 import org.apache.activemq.command.ActiveMQDestination; 26 import org.apache.activemq.command.ActiveMQQueue; 27 import org.apache.activemq.command.ConnectionInfo; 28 import org.apache.activemq.command.ConsumerInfo; 29 import org.apache.activemq.command.Message; 30 import org.apache.activemq.command.ProducerInfo; 31 import org.apache.activemq.command.RemoveInfo; 32 import org.apache.activemq.command.SessionInfo; 33 34 public class AdvisoryBrokerTest extends BrokerTestSupport { 35 36 public void testConnectionAdvisories() throws Exception { 37 38 ActiveMQDestination destination = AdvisorySupport.getConnectionAdvisoryTopic(); 39 40 StubConnection connection1 = createConnection(); 42 ConnectionInfo connectionInfo1 = createConnectionInfo(); 43 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 44 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 45 consumerInfo1.setPrefetchSize(100); 46 47 connection1.send(connectionInfo1); 48 connection1.send(sessionInfo1); 49 connection1.send(consumerInfo1); 50 51 Message m1 = receiveMessage(connection1); 53 assertNotNull(m1); 54 assertNotNull(m1.getDataStructure()); 55 assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), connectionInfo1.getConnectionId()); 56 57 StubConnection connection2 = createConnection(); 59 ConnectionInfo connectionInfo2 = createConnectionInfo(); 60 connection2.send(connectionInfo2); 61 62 m1 = receiveMessage(connection1); 64 assertNotNull(m1); 65 assertNotNull(m1.getDataStructure()); 66 assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), connectionInfo2.getConnectionId()); 67 68 connection2.send(closeConnectionInfo(connectionInfo2)); 70 connection2.stop(); 71 72 m1 = receiveMessage(connection1); 74 assertNotNull(m1); 75 assertNotNull(m1.getDataStructure()); 76 RemoveInfo r = (RemoveInfo) m1.getDataStructure(); 77 assertEquals(r.getObjectId(), connectionInfo2.getConnectionId()); 78 79 assertNoMessagesLeft(connection1); 80 } 81 82 public void testConsumerAdvisories() throws Exception { 83 84 ActiveMQDestination queue = new ActiveMQQueue("test"); 85 ActiveMQDestination destination = AdvisorySupport.getConsumerAdvisoryTopic(queue); 86 87 StubConnection connection1 = createConnection(); 89 ConnectionInfo connectionInfo1 = createConnectionInfo(); 90 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 91 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 92 consumerInfo1.setPrefetchSize(100); 93 94 connection1.send(connectionInfo1); 95 connection1.send(sessionInfo1); 96 connection1.send(consumerInfo1); 97 98 assertNoMessagesLeft(connection1); 100 101 StubConnection connection2 = createConnection(); 103 ConnectionInfo connectionInfo2 = createConnectionInfo(); 104 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 105 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue); 106 consumerInfo1.setPrefetchSize(100); 107 108 connection2.send(connectionInfo2); 109 connection2.send(sessionInfo2); 110 connection2.send(consumerInfo2); 111 112 Message m1 = receiveMessage(connection1); 114 assertNotNull(m1); 115 assertNotNull(m1.getDataStructure()); 116 assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), consumerInfo2.getConsumerId()); 117 118 connection2.request(closeConnectionInfo(connectionInfo2)); 120 connection2.stop(); 121 122 m1 = receiveMessage(connection1); 124 assertNotNull(m1); 125 assertNotNull(m1.getDataStructure()); 126 RemoveInfo r = (RemoveInfo) m1.getDataStructure(); 127 assertEquals(r.getObjectId(), consumerInfo2.getConsumerId()); 128 129 assertNoMessagesLeft(connection2); 130 } 131 132 public void testConsumerAdvisoriesReplayed() throws Exception { 133 134 ActiveMQDestination queue = new ActiveMQQueue("test"); 135 ActiveMQDestination destination = AdvisorySupport.getConsumerAdvisoryTopic(queue); 136 137 StubConnection connection1 = createConnection(); 139 ConnectionInfo connectionInfo1 = createConnectionInfo(); 140 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 141 142 connection1.send(connectionInfo1); 143 connection1.send(sessionInfo1); 144 145 StubConnection connection2 = createConnection(); 147 ConnectionInfo connectionInfo2 = createConnectionInfo(); 148 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 149 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue); 150 consumerInfo2.setPrefetchSize(100); 151 connection2.send(connectionInfo2); 152 connection2.send(sessionInfo2); 153 connection2.send(consumerInfo2); 154 155 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 157 consumerInfo1.setPrefetchSize(100); 158 connection1.send(consumerInfo1); 159 160 Message m1 = receiveMessage(connection1); 161 assertNotNull(m1); 162 assertNotNull(m1.getDataStructure()); 163 assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), consumerInfo2.getConsumerId()); 164 165 connection2.request(closeConnectionInfo(connectionInfo2)); 167 connection2.stop(); 168 169 m1 = receiveMessage(connection1); 171 assertNotNull(m1); 172 assertNotNull(m1.getDataStructure()); 173 RemoveInfo r = (RemoveInfo) m1.getDataStructure(); 174 assertEquals(r.getObjectId(), consumerInfo2.getConsumerId()); 175 176 assertNoMessagesLeft(connection2); 177 } 178 179 180 public void testProducerAdvisories() throws Exception { 181 182 ActiveMQDestination queue = new ActiveMQQueue("test"); 183 ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue); 184 185 StubConnection connection1 = createConnection(); 187 ConnectionInfo connectionInfo1 = createConnectionInfo(); 188 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 189 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 190 consumerInfo1.setPrefetchSize(100); 191 192 connection1.send(connectionInfo1); 193 connection1.send(sessionInfo1); 194 connection1.send(consumerInfo1); 195 196 assertNoMessagesLeft(connection1); 197 198 StubConnection connection2 = createConnection(); 200 ConnectionInfo connectionInfo2 = createConnectionInfo(); 201 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 202 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 203 producerInfo2.setDestination(queue); 204 205 connection2.send(connectionInfo2); 206 connection2.send(sessionInfo2); 207 connection2.send(producerInfo2); 208 209 Message m1 = receiveMessage(connection1); 211 assertNotNull(m1); 212 assertNotNull(m1.getDataStructure()); 213 assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); 214 215 connection2.request(closeConnectionInfo(connectionInfo2)); 217 connection2.stop(); 218 219 m1 = receiveMessage(connection1); 221 assertNotNull(m1); 222 assertNotNull(m1.getDataStructure()); 223 RemoveInfo r = (RemoveInfo) m1.getDataStructure(); 224 assertEquals(r.getObjectId(), producerInfo2.getProducerId()); 225 226 assertNoMessagesLeft(connection2); 227 } 228 229 public void testProducerAdvisoriesReplayed() throws Exception { 230 231 ActiveMQDestination queue = new ActiveMQQueue("test"); 232 ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue); 233 234 StubConnection connection1 = createConnection(); 236 ConnectionInfo connectionInfo1 = createConnectionInfo(); 237 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 238 239 connection1.send(connectionInfo1); 240 connection1.send(sessionInfo1); 241 242 StubConnection connection2 = createConnection(); 244 ConnectionInfo connectionInfo2 = createConnectionInfo(); 245 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 246 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 247 producerInfo2.setDestination(queue); 248 249 connection2.send(connectionInfo2); 250 connection2.send(sessionInfo2); 251 connection2.send(producerInfo2); 252 253 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 255 consumerInfo1.setPrefetchSize(100); 256 connection1.send(consumerInfo1); 257 258 Message m1 = receiveMessage(connection1); 259 assertNotNull(m1); 260 assertNotNull(m1.getDataStructure()); 261 assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); 262 263 connection2.request(closeConnectionInfo(connectionInfo2)); 265 connection2.stop(); 266 267 m1 = receiveMessage(connection1); 269 assertNotNull(m1); 270 assertNotNull(m1.getDataStructure()); 271 RemoveInfo r = (RemoveInfo) m1.getDataStructure(); 272 assertEquals(r.getObjectId(), producerInfo2.getProducerId()); 273 274 assertNoMessagesLeft(connection2); 275 } 276 277 public void testProducerAdvisoriesReplayedOnlyTargetNewConsumer() throws Exception { 278 279 ActiveMQDestination queue = new ActiveMQQueue("test"); 280 ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue); 281 282 StubConnection connection1 = createConnection(); 284 ConnectionInfo connectionInfo1 = createConnectionInfo(); 285 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 286 connection1.send(connectionInfo1); 287 connection1.send(sessionInfo1); 288 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 290 consumerInfo1.setPrefetchSize(100); 291 connection1.send(consumerInfo1); 292 293 StubConnection connection2 = createConnection(); 295 ConnectionInfo connectionInfo2 = createConnectionInfo(); 296 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 297 ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); 298 producerInfo2.setDestination(queue); 299 connection2.send(connectionInfo2); 300 connection2.send(sessionInfo2); 301 connection2.send(producerInfo2); 302 303 Message m1 = receiveMessage(connection1); 304 assertNotNull(m1); 305 assertNotNull(m1.getDataStructure()); 306 assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); 307 308 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 310 consumerInfo2.setPrefetchSize(100); 311 connection2.send(consumerInfo2); 312 313 m1 = receiveMessage(connection2); 315 assertNotNull(m1); 316 assertNotNull(m1.getDataStructure()); 317 assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); 318 319 assertNoMessagesLeft(connection1); 321 } 322 323 public static Test suite() { 324 return suite(AdvisoryBrokerTest.class); 325 } 326 327 public static void main(String [] args) { 328 junit.textui.TestRunner.run(suite()); 329 } 330 331 } 332 | Popular Tags |