1 18 package org.apache.activemq; 19 20 import java.util.List ; 21 import java.util.Vector ; 22 23 import javax.jms.Connection ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 33 import org.apache.activemq.test.TestSupport; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 37 40 public class JmsTopicRequestReplyTest extends TestSupport implements MessageListener { 41 private final Log log = LogFactory.getLog(getClass()); 42 43 private Connection serverConnection; 44 private Connection clientConnection; 45 private MessageProducer replyProducer; 46 private Session serverSession; 47 private Destination requestDestination; 48 private List failures = new Vector (); 49 private boolean dynamicallyCreateProducer; 50 protected boolean useAsyncConsume = false; 51 private String clientSideClientID; 52 53 public void testSendAndReceive() throws Exception { 54 clientConnection = createConnection(); 55 clientConnection.setClientID("ClientConnection:" + getSubject()); 56 57 Session session = clientConnection.createSession(false, 58 Session.AUTO_ACKNOWLEDGE); 59 60 clientConnection.start(); 61 62 Destination replyDestination = createTemporaryDestination(session); 63 64 65 clientSideClientID = clientConnection.getClientID(); 67 68 log.info("Both the clientID and destination clientID match properly: " + clientSideClientID); 72 73 74 75 MessageProducer requestProducer = 76 session.createProducer(requestDestination); 77 MessageConsumer replyConsumer = 78 session.createConsumer(replyDestination); 79 80 81 82 TextMessage requestMessage = session.createTextMessage("Olivier"); 83 requestMessage.setJMSReplyTo(replyDestination); 84 requestProducer.send(requestMessage); 85 86 log.info("Sent request."); 87 log.info(requestMessage.toString()); 88 89 Message msg = replyConsumer.receive(5000); 90 91 92 if (msg instanceof TextMessage ) { 93 TextMessage replyMessage = (TextMessage ) msg; 94 log.info("Received reply."); 95 log.info(replyMessage.toString()); 96 assertEquals("Wrong message content", "Hello: Olivier", replyMessage.getText()); 97 } 98 else { 99 fail("Should have received a reply by now"); 100 } 101 102 assertEquals("Should not have had any failures: " + failures, 0, failures.size()); 103 } 104 105 public void testSendAndReceiveWithDynamicallyCreatedProducer() throws Exception { 106 dynamicallyCreateProducer = true; 107 testSendAndReceive(); 108 } 109 110 113 public void onMessage(Message message) { 114 try { 115 TextMessage requestMessage = (TextMessage ) message; 116 117 log.info("Received request."); 118 log.info(requestMessage.toString()); 119 120 Destination replyDestination = requestMessage.getJMSReplyTo(); 121 122 126 TextMessage replyMessage = serverSession.createTextMessage("Hello: " + requestMessage.getText()); 127 128 replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); 129 130 if (dynamicallyCreateProducer) { 131 replyProducer = serverSession.createProducer(replyDestination); 132 replyProducer.send(replyMessage); 133 } 134 else { 135 replyProducer.send(replyDestination, replyMessage); 136 } 137 138 log.info("Sent reply."); 139 log.info(replyMessage.toString()); 140 } 141 catch (JMSException e) { 142 onException(e); 143 } 144 } 145 146 149 protected void syncConsumeLoop(MessageConsumer requestConsumer) { 150 try { 151 Message message = requestConsumer.receive(5000); 152 if (message != null) { 153 onMessage(message); 154 } 155 else { 156 log.error("No message received"); 157 } 158 } 159 catch (JMSException e) { 160 onException(e); 161 } 162 } 163 164 165 protected void setUp() throws Exception { 166 super.setUp(); 167 168 serverConnection = createConnection(); 169 serverConnection.setClientID("serverConnection:" + getSubject()); 170 serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 171 172 replyProducer = serverSession.createProducer(null); 173 174 requestDestination = createDestination(serverSession); 175 176 177 final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination); 178 if (useAsyncConsume) { 179 requestConsumer.setMessageListener(this); 180 } 181 else { 182 Thread thread = new Thread (new Runnable () { 183 public void run() { 184 syncConsumeLoop(requestConsumer); 185 } 186 }); 187 thread.start(); 188 } 189 serverConnection.start(); 190 } 191 192 protected void tearDown() throws Exception { 193 super.tearDown(); 194 195 serverConnection.close(); 196 clientConnection.stop(); 197 clientConnection.close(); 198 } 199 200 protected void onException(JMSException e) { 201 log.info("Caught: " + e); 202 e.printStackTrace(); 203 failures.add(e); 204 } 205 206 protected Destination createDestination(Session session) throws JMSException { 207 if (topic) { 208 return session.createTopic(getSubject()); 209 } 210 return session.createQueue(getSubject()); 211 } 212 213 protected Destination createTemporaryDestination(Session session) throws JMSException { 214 if (topic) { 215 return session.createTemporaryTopic(); 216 } 217 return session.createTemporaryQueue(); 218 } 219 220 } 221 | Popular Tags |