1 18 package org.apache.activemq.network.jms; 19 20 import javax.jms.Destination ; 21 import javax.jms.JMSException ; 22 import javax.jms.Message ; 23 import javax.jms.MessageConsumer ; 24 import javax.jms.MessageListener ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.Queue ; 27 import javax.jms.QueueConnection ; 28 import javax.jms.QueueRequestor ; 29 import javax.jms.QueueSession ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 33 import junit.framework.TestCase; 34 35 import org.apache.activemq.ActiveMQConnectionFactory; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 import org.springframework.context.support.AbstractApplicationContext; 39 import org.springframework.context.support.ClassPathXmlApplicationContext; 40 41 public class QueueBridgeTest extends TestCase implements MessageListener { 42 43 protected static final Log log = LogFactory.getLog(QueueBridgeTest.class); 44 45 protected static final int MESSAGE_COUNT = 10; 46 protected AbstractApplicationContext context; 47 protected QueueConnection localConnection; 48 protected QueueConnection remoteConnection; 49 protected QueueRequestor requestor; 50 protected QueueSession requestServerSession; 51 protected MessageConsumer requestServerConsumer; 52 protected MessageProducer requestServerProducer; 53 54 protected void setUp() throws Exception { 55 super.setUp(); 56 context = createApplicationContext(); 57 58 createConnections(); 59 60 requestServerSession = localConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 61 Queue theQueue = requestServerSession.createQueue(getClass().getName()); 62 requestServerConsumer = requestServerSession.createConsumer(theQueue); 63 requestServerConsumer.setMessageListener(this); 64 requestServerProducer = requestServerSession.createProducer(null); 65 66 QueueSession session = remoteConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 67 requestor = new QueueRequestor (session,theQueue); 68 } 69 70 71 protected void createConnections() throws JMSException { 72 ActiveMQConnectionFactory fac = (ActiveMQConnectionFactory) context.getBean("localFactory"); 73 localConnection = fac.createQueueConnection(); 74 localConnection.start(); 75 76 fac = (ActiveMQConnectionFactory) context.getBean("remoteFactory"); 77 remoteConnection = fac.createQueueConnection(); 78 remoteConnection.start(); 79 } 80 81 82 protected AbstractApplicationContext createApplicationContext() { 83 return new ClassPathXmlApplicationContext("org/apache/activemq/network/jms/queue-config.xml"); 84 } 85 86 87 protected void tearDown() throws Exception { 88 localConnection.close(); 89 super.tearDown(); 90 } 91 92 public void testQueueRequestorOverBridge() throws JMSException { 93 for (int i =0;i < MESSAGE_COUNT; i++){ 94 TextMessage msg = requestServerSession.createTextMessage("test msg: " +i); 95 TextMessage result = (TextMessage ) requestor.request(msg); 96 assertNotNull(result); 97 log.info(result.getText()); 98 } 99 } 100 101 public void onMessage(Message msg){ 102 try{ 103 TextMessage textMsg=(TextMessage ) msg; 104 String payload="REPLY: "+textMsg.getText(); 105 Destination replyTo; 106 replyTo=msg.getJMSReplyTo(); 107 textMsg.clearBody(); 108 textMsg.setText(payload); 109 requestServerProducer.send(replyTo,textMsg); 110 }catch(JMSException e){ 111 e.printStackTrace(); 113 } 114 } 115 116 } 117 | Popular Tags |