1 17 package org.apache.activemq.broker.util; 18 19 import org.apache.activemq.command.ActiveMQTextMessage; 20 import org.apache.activemq.util.FactoryFinder; 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 import javax.jms.TextMessage ; 31 import java.io.BufferedReader ; 32 import java.io.IOException ; 33 import java.io.InputStreamReader ; 34 35 38 public class CommandMessageListener implements MessageListener { 39 private static final Log log = LogFactory.getLog(CommandMessageListener.class); 40 41 private Session session; 42 private MessageProducer producer; 43 private CommandHandler handler; 44 45 public CommandMessageListener(Session session) { 46 this.session = session; 47 } 48 49 public void onMessage(Message message) { 50 if (log.isDebugEnabled()) { 51 log.debug("Received command: " + message); 52 } 53 if (message instanceof TextMessage) { 54 TextMessage request = (TextMessage) message; 55 try { 56 Destination replyTo = message.getJMSReplyTo(); 57 if (replyTo == null) { 58 log.warn("Ignored message as no JMSReplyTo set: " + message); 59 return; 60 } 61 Message response = processCommand(request); 62 addReplyHeaders(request, response); 63 getProducer().send(replyTo, response); 64 } 65 catch (Exception e) { 66 log.error("Failed to process message due to: " + e + ". Message: " + message, e); 67 } 68 } 69 else { 70 log.warn("Ignoring invalid message: " + message); 71 } 72 } 73 74 protected void addReplyHeaders(TextMessage request, Message response) throws JMSException { 75 String correlationID = request.getJMSCorrelationID(); 76 if (correlationID != null) { 77 response.setJMSCorrelationID(correlationID); 78 } 79 } 80 81 84 public Message processCommand(TextMessage request) throws Exception { 85 TextMessage response = session.createTextMessage(); 86 getHandler().processCommand(request, response); 87 return response; 88 } 89 90 93 public String processCommandText(String line) throws Exception { 94 TextMessage request = new ActiveMQTextMessage(); 95 request.setText(line); 96 TextMessage response = new ActiveMQTextMessage(); 97 getHandler().processCommand(request, response); 98 return response.getText(); 99 } 100 101 public Session getSession() { 102 return session; 103 } 104 105 public MessageProducer getProducer() throws JMSException { 106 if (producer == null) { 107 producer = getSession().createProducer(null); 108 } 109 return producer; 110 } 111 112 public CommandHandler getHandler() throws IllegalAccessException , IOException , InstantiationException , ClassNotFoundException { 113 if (handler == null) { 114 handler = createHandler(); 115 } 116 return handler; 117 } 118 119 private CommandHandler createHandler() throws IllegalAccessException , IOException , ClassNotFoundException , InstantiationException { 120 FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/"); 121 return (CommandHandler) factoryFinder.newInstance("agent"); 122 } 123 } 124 | Popular Tags |