1 18 package org.apache.activemq.network.jms; 19 20 import javax.jms.Destination ; 21 import javax.jms.JMSException ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageListener ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Session ; 27 import javax.jms.TextMessage ; 28 import javax.jms.Topic ; 29 import javax.jms.TopicConnection ; 30 import javax.jms.TopicRequestor ; 31 import javax.jms.TopicSession ; 32 import junit.framework.TestCase; 33 34 import org.apache.activemq.ActiveMQConnectionFactory; 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 import org.springframework.context.support.AbstractApplicationContext; 38 import org.springframework.context.support.ClassPathXmlApplicationContext; 39 40 public class TopicBridgeSpringTest extends TestCase implements MessageListener { 41 42 protected static final Log log = LogFactory.getLog(TopicBridgeSpringTest.class); 43 44 protected static final int MESSAGE_COUNT = 10; 45 protected AbstractApplicationContext context; 46 protected TopicConnection localConnection; 47 protected TopicConnection remoteConnection; 48 protected TopicRequestor requestor; 49 protected TopicSession requestServerSession; 50 protected MessageConsumer requestServerConsumer; 51 protected MessageProducer requestServerProducer; 52 53 protected void setUp() throws Exception { 54 55 super.setUp(); 56 context = createApplicationContext(); 57 ActiveMQConnectionFactory fac = (ActiveMQConnectionFactory) context.getBean("localFactory"); 58 localConnection = fac.createTopicConnection(); 59 localConnection.start(); 60 requestServerSession = localConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 61 Topic theTopic = requestServerSession.createTopic(getClass().getName()); 62 requestServerConsumer = requestServerSession.createConsumer(theTopic); 63 requestServerConsumer.setMessageListener(this); 64 requestServerProducer = requestServerSession.createProducer(null); 65 66 fac = (ActiveMQConnectionFactory) context.getBean("remoteFactory"); 67 remoteConnection = fac.createTopicConnection(); 68 remoteConnection.start(); 69 TopicSession session = remoteConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 70 requestor = new TopicRequestor (session,theTopic); 71 } 72 73 74 protected AbstractApplicationContext createApplicationContext() { 75 return new ClassPathXmlApplicationContext("org/apache/activemq/network/jms/topic-spring.xml"); 76 } 77 78 79 protected void tearDown() throws Exception { 80 localConnection.close(); 81 super.tearDown(); 82 } 83 84 public void testTopicRequestorOverBridge() throws JMSException { 85 for (int i =0;i < MESSAGE_COUNT; i++){ 86 TextMessage msg = requestServerSession.createTextMessage("test msg: " +i); 87 log.info("Making request: " + msg); 88 TextMessage result = (TextMessage ) requestor.request(msg); 89 assertNotNull(result); 90 log.info("Received result: " + result.getText()); 91 } 92 } 93 94 public void onMessage(Message msg){ 95 try{ 96 TextMessage textMsg=(TextMessage ) msg; 97 String payload="REPLY: "+textMsg.getText(); 98 Destination replyTo; 99 replyTo=msg.getJMSReplyTo(); 100 textMsg.clearBody(); 101 textMsg.setText(payload); 102 log.info("Sending response: " + textMsg); 103 requestServerProducer.send(replyTo,textMsg); 104 }catch(JMSException e){ 105 e.printStackTrace(); 107 } 108 } 109 110 } 111 | Popular Tags |