1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; 21 22 import javax.jms.Destination ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageProducer ; 26 27 30 public class PublishOnTopicConsumedMessageTest extends JmsTopicSendReceiveWithTwoConnectionsTest { 31 private MessageProducer replyProducer; 32 33 34 public synchronized void onMessage(Message message) { 35 36 try { 38 Message msgCopy = (Message )((org.apache.activemq.command.Message)message).copy(); 39 replyProducer.send(msgCopy); 40 41 super.onMessage(message); 43 } 44 catch (JMSException e) { 45 log.info("Failed to send message: " + e); 46 e.printStackTrace(); 47 } 48 } 49 50 protected void setUp() throws Exception { 51 super.setUp(); 52 53 Destination replyDestination = null; 54 55 if (topic) { 56 replyDestination = receiveSession.createTopic("REPLY." + getSubject()); 57 } 58 else { 59 replyDestination = receiveSession.createQueue("REPLY." + getSubject()); 60 } 61 62 replyProducer = receiveSession.createProducer(replyDestination); 63 log.info("Created replyProducer: " + replyProducer); 64 65 } 66 } 67 | Popular Tags |