1 18 package org.apache.activemq.broker; 19 20 import java.io.IOException ; 21 import java.net.URI ; 22 import java.net.URISyntaxException ; 23 import java.util.Iterator ; 24 25 import javax.management.MBeanServer ; 26 import javax.management.ObjectName ; 27 28 import org.apache.activemq.broker.jmx.ManagedTransportConnector; 29 import org.apache.activemq.broker.region.ConnectorStatistics; 30 import org.apache.activemq.command.BrokerInfo; 31 import org.apache.activemq.security.MessageAuthorizationPolicy; 32 import org.apache.activemq.thread.TaskRunnerFactory; 33 import org.apache.activemq.transport.Transport; 34 import org.apache.activemq.transport.TransportAcceptListener; 35 import org.apache.activemq.transport.TransportFactory; 36 import org.apache.activemq.transport.TransportServer; 37 import org.apache.activemq.transport.discovery.DiscoveryAgent; 38 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 39 import org.apache.activemq.util.ServiceStopper; 40 import org.apache.activemq.util.ServiceSupport; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 44 import java.util.concurrent.CopyOnWriteArrayList ; 45 46 51 public class TransportConnector implements Connector { 52 53 private static final Log log = LogFactory.getLog(TransportConnector.class); 54 55 private Broker broker; 56 private TransportServer server; 57 private URI uri; 58 private BrokerInfo brokerInfo = new BrokerInfo(); 59 private TaskRunnerFactory taskRunnerFactory = null; 60 private MessageAuthorizationPolicy messageAuthorizationPolicy; 61 private DiscoveryAgent discoveryAgent; 62 protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList (); 63 protected TransportStatusDetector statusDector; 64 private ConnectorStatistics statistics = new ConnectorStatistics(); 65 private URI discoveryUri; 66 private URI connectUri; 67 private String name; 68 private boolean disableAsyncDispatch=false; 69 private boolean enableStatusMonitor = true; 70 71 72 75 public CopyOnWriteArrayList getConnections(){ 76 return connections; 77 } 78 79 public TransportConnector(){ 80 } 81 82 83 public TransportConnector(Broker broker,TransportServer server){ 84 this(); 85 setBroker(broker); 86 setServer(server); 87 if (server!=null&&server.getConnectURI()!=null){ 88 URI uri = server.getConnectURI(); 89 if (uri != null && uri.getScheme().equals("vm")){ 90 setEnableStatusMonitor(false); 91 } 92 } 93 94 } 95 96 99 public ManagedTransportConnector asManagedConnector(MBeanServer mbeanServer, ObjectName connectorName) throws IOException , URISyntaxException { 100 ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getBroker(), getServer()); 101 rc.setTaskRunnerFactory(getTaskRunnerFactory()); 102 rc.setUri(uri); 103 rc.setConnectUri(connectUri); 104 rc.setDiscoveryAgent(discoveryAgent); 105 rc.setDiscoveryUri(discoveryUri); 106 rc.setName(name); 107 rc.setDisableAsyncDispatch(disableAsyncDispatch); 108 rc.setBrokerInfo(brokerInfo); 109 return rc; 110 } 111 112 public BrokerInfo getBrokerInfo() { 113 return brokerInfo; 114 } 115 116 public void setBrokerInfo(BrokerInfo brokerInfo) { 117 this.brokerInfo = brokerInfo; 118 } 119 120 public TransportServer getServer() throws IOException , URISyntaxException { 121 if (server == null) { 122 setServer(createTransportServer()); 123 } 124 return server; 125 } 126 127 public Broker getBroker() { 128 return broker; 129 } 130 131 public void setBroker(Broker broker) { 132 this.broker = broker; 133 brokerInfo.setBrokerId(broker.getBrokerId()); 134 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); 135 } 136 137 public void setBrokerName(String brokerName) { 138 brokerInfo.setBrokerName(brokerName); 139 } 140 141 public void setServer(TransportServer server) { 142 this.server = server; 143 this.brokerInfo.setBrokerURL(server.getConnectURI().toString()); 144 this.server.setAcceptListener(new TransportAcceptListener() { 145 public void onAccept(Transport transport) { 146 try { 147 Connection connection = createConnection(transport); 148 connection.start(); 149 } 150 catch (Exception e) { 151 String remoteHost = transport.getRemoteAddress(); 152 ServiceSupport.dispose(transport); 153 onAcceptError(e, remoteHost); 154 } 155 } 156 157 public void onAcceptError(Exception error) { 158 onAcceptError(error,null); 159 } 160 161 private void onAcceptError(Exception error, String remoteHost) { 162 log.error("Could not accept connection " + 163 (remoteHost == null ? "" : "from " + remoteHost) 164 + ": " + error, error); 165 } 166 }); 167 this.server.setBrokerInfo(brokerInfo); 168 } 169 170 public URI getUri() { 171 if( uri == null ) { 172 try { 173 uri = getConnectUri(); 174 } catch (Throwable e) { 175 } 176 } 177 return uri; 178 } 179 180 188 public void setUri(URI uri) { 189 this.uri = uri; 190 } 191 192 public TaskRunnerFactory getTaskRunnerFactory() { 193 return taskRunnerFactory; 194 } 195 196 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 197 this.taskRunnerFactory = taskRunnerFactory; 198 } 199 200 203 public ConnectorStatistics getStatistics() { 204 return statistics; 205 } 206 207 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 208 return messageAuthorizationPolicy; 209 } 210 211 215 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 216 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 217 } 218 219 public void start() throws Exception { 220 getServer().start(); 221 DiscoveryAgent da = getDiscoveryAgent(); 222 if( da!=null ) { 223 da.setBrokerName(getBrokerInfo().getBrokerName()); 224 da.registerService(getConnectUri().toString()); 225 da.start(); 226 } 227 if (enableStatusMonitor){ 228 this.statusDector = new TransportStatusDetector(this); 229 this.statusDector.start(); 230 } 231 232 log.info("Connector "+getName()+" Started"); 233 } 234 235 public void stop() throws Exception { 236 ServiceStopper ss = new ServiceStopper(); 237 if( discoveryAgent!=null ) { 238 ss.stop(discoveryAgent); 239 } 240 if (server != null) { 241 ss.stop(server); 242 } 243 if (this.statusDector != null){ 244 this.statusDector.stop(); 245 } 246 247 for (Iterator iter = connections.iterator(); iter.hasNext();) { 248 TransportConnection c = (TransportConnection) iter.next(); 249 ss.stop(c); 250 } 251 ss.throwFirstException(); 252 log.info("Connector "+getName()+" Stopped"); 253 } 254 255 protected Connection createConnection(Transport transport) throws IOException { 258 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory); 259 boolean statEnabled = this.getStatistics().isEnabled(); 260 answer.getStatistics().setEnabled(statEnabled); 261 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); 262 return answer; 263 } 264 265 protected TransportServer createTransportServer() throws IOException , URISyntaxException { 266 if (uri == null) { 267 throw new IllegalArgumentException ("You must specify either a server or uri property"); 268 } 269 if (broker == null) { 270 throw new IllegalArgumentException ("You must specify the broker property. Maybe this connector should be added to a broker?"); 271 } 272 return TransportFactory.bind(broker.getBrokerId().getValue(),uri); 273 } 274 275 public DiscoveryAgent getDiscoveryAgent() throws IOException { 276 if( discoveryAgent==null ) { 277 discoveryAgent = createDiscoveryAgent(); 278 } 279 return discoveryAgent; 280 } 281 282 protected DiscoveryAgent createDiscoveryAgent() throws IOException { 283 if( discoveryUri!=null ) { 284 return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); 285 } 286 return null; 287 } 288 289 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 290 this.discoveryAgent = discoveryAgent; 291 } 292 293 public URI getDiscoveryUri() { 294 return discoveryUri; 295 } 296 297 public void setDiscoveryUri(URI discoveryUri) { 298 this.discoveryUri = discoveryUri; 299 } 300 301 public URI getConnectUri() throws IOException , URISyntaxException { 302 if( connectUri==null ) { 303 if( server !=null ) { 304 connectUri = server.getConnectURI(); 305 } 306 } 307 return connectUri; 308 } 309 310 public void setConnectUri(URI transportUri) { 311 this.connectUri = transportUri; 312 } 313 314 public void onStarted(TransportConnection connection) { 315 connections.add(connection); 316 } 317 318 public void onStopped(TransportConnection connection) { 319 connections.remove(connection); 320 } 321 322 public String getName(){ 323 if( name==null ){ 324 uri = getUri(); 325 if( uri != null ) { 326 name = uri.toString(); 327 } 328 } 329 return name; 330 } 331 public void setName(String name) { 332 this.name = name; 333 } 334 335 public String toString() { 336 String rc = getName(); 337 if( rc == null ) 338 rc = super.toString(); 339 return rc; 340 } 341 342 public boolean isDisableAsyncDispatch() { 343 return disableAsyncDispatch; 344 } 345 346 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) { 347 this.disableAsyncDispatch = disableAsyncDispatch; 348 } 349 350 353 public boolean isEnableStatusMonitor(){ 354 return enableStatusMonitor; 355 } 356 357 360 public void setEnableStatusMonitor(boolean enableStatusMonitor){ 361 this.enableStatusMonitor=enableStatusMonitor; 362 } 363 } 364 | Popular Tags |