1 17 package org.apache.activemq.broker.util; 18 19 import org.apache.activemq.ActiveMQConnectionFactory; 20 import org.apache.activemq.Service; 21 import org.apache.activemq.advisory.AdvisorySupport; 22 import org.apache.activemq.util.ServiceStopper; 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.springframework.beans.factory.DisposableBean; 26 import org.springframework.beans.factory.InitializingBean; 27 import org.springframework.beans.factory.FactoryBean; 28 29 import javax.jms.Connection ; 30 import javax.jms.ConnectionFactory ; 31 import javax.jms.Destination ; 32 import javax.jms.JMSException ; 33 import javax.jms.MessageConsumer ; 34 import javax.jms.Session ; 35 36 42 public class CommandAgent implements Service, InitializingBean, DisposableBean, FactoryBean { 43 private static final Log log = LogFactory.getLog(CommandAgent.class); 44 45 private String brokerUrl = "vm://localhost"; 46 private ConnectionFactory connectionFactory; 47 private Connection connection; 48 private Destination commandDestination; 49 private CommandMessageListener listener; 50 private Session session; 51 private MessageConsumer consumer; 52 53 54 public void start() throws Exception { 55 session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 56 listener = new CommandMessageListener(session); 57 Destination destination = getCommandDestination(); 58 if (log.isDebugEnabled()) { 59 log.debug("Agent subscribing to control destination: " + destination); 60 } 61 consumer = session.createConsumer(destination); 62 consumer.setMessageListener(listener); 63 } 64 65 public void stop() throws Exception { 66 ServiceStopper stopper = new ServiceStopper(); 67 if (consumer != null) { 68 try { 69 consumer.close(); 70 consumer = null; 71 } 72 catch (JMSException e) { 73 stopper.onException(this, e); 74 } 75 } 76 if (session != null) { 77 try { 78 session.close(); 79 session = null; 80 } 81 catch (JMSException e) { 82 stopper.onException(this, e); 83 } 84 } 85 if (connection != null) { 86 try { 87 connection.close(); 88 connection = null; 89 } 90 catch (JMSException e) { 91 stopper.onException(this, e); 92 } 93 } 94 stopper.throwFirstException(); 95 } 96 97 public void afterPropertiesSet() throws Exception { 100 start(); 101 } 102 103 public void destroy() throws Exception { 104 stop(); 105 } 106 107 public Object getObject() throws Exception { 108 return this; 109 } 110 111 public Class getObjectType() { 112 return getClass(); 113 } 114 115 public boolean isSingleton() { 116 return true; 117 } 118 119 120 121 public String getBrokerUrl() { 124 return brokerUrl; 125 } 126 127 public void setBrokerUrl(String brokerUrl) { 128 this.brokerUrl = brokerUrl; 129 } 130 131 public ConnectionFactory getConnectionFactory() { 132 if (connectionFactory == null) { 133 connectionFactory = new ActiveMQConnectionFactory(brokerUrl); 134 } 135 return connectionFactory; 136 } 137 138 public void setConnectionFactory(ConnectionFactory connectionFactory) { 139 this.connectionFactory = connectionFactory; 140 } 141 142 public Connection getConnection() throws JMSException { 143 if (connection == null) { 144 connection = createConnection(); 145 connection.start(); 146 } 147 return connection; 148 } 149 150 public void setConnection(Connection connection) { 151 this.connection = connection; 152 } 153 154 public Destination getCommandDestination() { 155 if (commandDestination == null) { 156 commandDestination = createCommandDestination(); 157 } 158 return commandDestination; 159 } 160 161 public void setCommandDestination(Destination commandDestination) { 162 this.commandDestination = commandDestination; 163 } 164 165 protected Connection createConnection() throws JMSException { 166 return getConnectionFactory().createConnection(); 167 } 168 169 protected Destination createCommandDestination() { 170 return AdvisorySupport.getAgentDestination(); 171 } 172 } 173 | Popular Tags |