1 18 package org.apache.activemq.network; 19 20 import java.net.URI ; 21 22 import javax.jms.Connection ; 23 import javax.jms.DeliveryMode ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 import javax.jms.TopicRequestor ; 33 import javax.jms.TopicSession ; 34 35 import junit.framework.TestCase; 36 37 import org.apache.activemq.ActiveMQConnectionFactory; 38 import org.apache.activemq.broker.BrokerService; 39 import org.apache.activemq.command.ActiveMQTopic; 40 import org.apache.activemq.xbean.BrokerFactoryBean; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 import org.springframework.context.support.AbstractApplicationContext; 44 import org.springframework.core.io.ClassPathResource; 45 import org.springframework.core.io.Resource; 46 public class SimpleNetworkTest extends TestCase{ 47 48 protected static final Log log = LogFactory.getLog(SimpleNetworkTest.class); 49 50 protected static final int MESSAGE_COUNT=10; 51 protected AbstractApplicationContext context; 52 protected Connection localConnection; 53 protected Connection remoteConnection; 54 protected BrokerService localBroker; 55 protected BrokerService remoteBroker; 56 protected Session localSession; 57 protected Session remoteSession; 58 protected ActiveMQTopic included; 59 protected ActiveMQTopic excluded; 60 protected String consumerName="durableSubs"; 61 62 63 public void testRequestReply() throws Exception { 64 final MessageProducer remoteProducer=remoteSession.createProducer(null); 65 MessageConsumer remoteConsumer=remoteSession.createConsumer(included); 66 remoteConsumer.setMessageListener(new MessageListener (){ 67 public void onMessage(Message msg){ 68 try{ 69 TextMessage textMsg=(TextMessage ) msg; 70 String payload="REPLY: "+textMsg.getText(); 71 Destination replyTo; 72 replyTo=msg.getJMSReplyTo(); 73 textMsg.clearBody(); 74 textMsg.setText(payload); 75 remoteProducer.send(replyTo,textMsg); 76 }catch(JMSException e){ 77 e.printStackTrace(); 79 } 80 } 81 }); 82 83 TopicRequestor requestor=new TopicRequestor ((TopicSession ) localSession,included); 84 Thread.sleep(2000); for (int i =0;i < MESSAGE_COUNT; i++){ 86 TextMessage msg = localSession.createTextMessage("test msg: " +i); 87 TextMessage result = (TextMessage ) requestor.request(msg); 88 assertNotNull(result); 89 log.info(result.getText()); 90 } 91 } 92 93 public void testFiltering() throws Exception { 94 MessageConsumer includedConsumer=remoteSession.createConsumer(included); 95 MessageConsumer excludedConsumer=remoteSession.createConsumer(excluded); 96 MessageProducer includedProducer=localSession.createProducer(included); 97 MessageProducer excludedProducer=localSession.createProducer(excluded); 98 Thread.sleep(1000); 99 Message test=localSession.createTextMessage("test"); 100 includedProducer.send(test); 101 excludedProducer.send(test); 102 assertNull(excludedConsumer.receive(500)); 103 assertNotNull(includedConsumer.receive(500)); 104 } 105 106 public void testConduitBridge() throws Exception { 107 MessageConsumer consumer1=remoteSession.createConsumer(included); 108 MessageConsumer consumer2=remoteSession.createConsumer(included); 109 MessageProducer producer=localSession.createProducer(included); 110 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 111 Thread.sleep(1000); 112 for(int i=0;i<MESSAGE_COUNT;i++){ 113 Message test=localSession.createTextMessage("test-"+i); 114 producer.send(test); 115 assertNotNull(consumer1.receive(500)); 116 assertNotNull(consumer2.receive(500)); 117 } 118 assertNull(consumer1.receive(500)); 120 assertNull(consumer2.receive(500)); 121 } 122 123 public void testDurableStoreAndForward() throws Exception { 124 MessageConsumer remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName); 126 Thread.sleep(1000); 127 doTearDown(); 129 doSetUp(); 130 MessageProducer producer=localSession.createProducer(included); 131 for(int i=0;i<MESSAGE_COUNT;i++){ 132 Message test=localSession.createTextMessage("test-"+i); 133 producer.send(test); 134 } 135 Thread.sleep(1000); 136 doTearDown(); 138 doSetUp(); 139 remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName); 140 for(int i=0;i<MESSAGE_COUNT;i++){ 141 Message test=localSession.createTextMessage("test-"+i); 142 assertNotNull(remoteConsumer.receive(500)); 143 } 144 } 145 146 147 148 protected void setUp() throws Exception { 149 super.setUp(); 150 doSetUp(); 151 } 152 153 protected void tearDown() throws Exception { 154 localBroker.deleteAllMessages(); 155 remoteBroker.deleteAllMessages(); 156 doTearDown(); 157 super.tearDown(); 158 } 159 160 protected void doTearDown() throws Exception { 161 localConnection.close(); 162 remoteConnection.close(); 163 localBroker.stop(); 164 remoteBroker.stop(); 165 } 166 167 protected void doSetUp() throws Exception { 168 Resource resource=new ClassPathResource(getRemoteBrokerURI()); 169 BrokerFactoryBean factory=new BrokerFactoryBean(resource); 170 factory.afterPropertiesSet(); 171 remoteBroker=factory.getBroker(); 172 remoteBroker.start(); 173 174 resource=new ClassPathResource(getLocalBrokerURI()); 175 factory=new BrokerFactoryBean(resource); 176 factory.afterPropertiesSet(); 177 localBroker=factory.getBroker(); 178 179 localBroker.start(); 180 181 URI localURI=localBroker.getVmConnectorURI(); 182 ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI); 183 localConnection=fac.createConnection(); 184 localConnection.setClientID("local"); 185 localConnection.start(); 186 URI remoteURI=remoteBroker.getVmConnectorURI(); 187 fac=new ActiveMQConnectionFactory(remoteURI); 188 remoteConnection=fac.createConnection(); 189 remoteConnection.setClientID("remote"); 190 remoteConnection.start(); 191 included=new ActiveMQTopic("include.test.bar"); 192 excluded=new ActiveMQTopic("exclude.test.bar"); 193 localSession=localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); 194 remoteSession=remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); 195 } 196 197 protected String getRemoteBrokerURI() { 198 return "org/apache/activemq/network/remoteBroker.xml"; 199 } 200 201 protected String getLocalBrokerURI() { 202 return "org/apache/activemq/network/localBroker.xml"; 203 } 204 } 205 | Popular Tags |