KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > TransportConnector


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import java.io.IOException JavaDoc;
21 import java.net.URI JavaDoc;
22 import java.net.URISyntaxException JavaDoc;
23 import java.util.Iterator JavaDoc;
24
25 import javax.management.MBeanServer JavaDoc;
26 import javax.management.ObjectName JavaDoc;
27
28 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
29 import org.apache.activemq.broker.region.ConnectorStatistics;
30 import org.apache.activemq.command.BrokerInfo;
31 import org.apache.activemq.security.MessageAuthorizationPolicy;
32 import org.apache.activemq.thread.TaskRunnerFactory;
33 import org.apache.activemq.transport.Transport;
34 import org.apache.activemq.transport.TransportAcceptListener;
35 import org.apache.activemq.transport.TransportFactory;
36 import org.apache.activemq.transport.TransportServer;
37 import org.apache.activemq.transport.discovery.DiscoveryAgent;
38 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
39 import org.apache.activemq.util.ServiceStopper;
40 import org.apache.activemq.util.ServiceSupport;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43
44 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
45
46 /**
47  * @org.apache.xbean.XBean
48  *
49  * @version $Revision: 1.6 $
50  */

51 public class TransportConnector implements Connector {
52
53     private static final Log log = LogFactory.getLog(TransportConnector.class);
54
55     private Broker broker;
56     private TransportServer server;
57     private URI JavaDoc uri;
58     private BrokerInfo brokerInfo = new BrokerInfo();
59     private TaskRunnerFactory taskRunnerFactory = null;
60     private MessageAuthorizationPolicy messageAuthorizationPolicy;
61     private DiscoveryAgent discoveryAgent;
62     protected CopyOnWriteArrayList JavaDoc connections = new CopyOnWriteArrayList JavaDoc();
63     protected TransportStatusDetector statusDector;
64     private ConnectorStatistics statistics = new ConnectorStatistics();
65     private URI JavaDoc discoveryUri;
66     private URI JavaDoc connectUri;
67     private String JavaDoc name;
68     private boolean disableAsyncDispatch=false;
69     private boolean enableStatusMonitor = true;
70
71
72     /**
73      * @return Returns the connections.
74      */

75     public CopyOnWriteArrayList JavaDoc getConnections(){
76         return connections;
77     }
78
79     public TransportConnector(){
80     }
81     
82
83     public TransportConnector(Broker broker,TransportServer server){
84         this();
85         setBroker(broker);
86         setServer(server);
87         if (server!=null&&server.getConnectURI()!=null){
88             URI JavaDoc uri = server.getConnectURI();
89             if (uri != null && uri.getScheme().equals("vm")){
90                 setEnableStatusMonitor(false);
91             }
92         }
93         
94     }
95
96     /**
97      * Factory method to create a JMX managed version of this transport connector
98      */

99     public ManagedTransportConnector asManagedConnector(MBeanServer JavaDoc mbeanServer, ObjectName JavaDoc connectorName) throws IOException JavaDoc, URISyntaxException JavaDoc {
100         ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getBroker(), getServer());
101         rc.setTaskRunnerFactory(getTaskRunnerFactory());
102         rc.setUri(uri);
103         rc.setConnectUri(connectUri);
104         rc.setDiscoveryAgent(discoveryAgent);
105         rc.setDiscoveryUri(discoveryUri);
106         rc.setName(name);
107         rc.setDisableAsyncDispatch(disableAsyncDispatch);
108         rc.setBrokerInfo(brokerInfo);
109         return rc;
110     }
111     
112     public BrokerInfo getBrokerInfo() {
113         return brokerInfo;
114     }
115
116     public void setBrokerInfo(BrokerInfo brokerInfo) {
117         this.brokerInfo = brokerInfo;
118     }
119
120     public TransportServer getServer() throws IOException JavaDoc, URISyntaxException JavaDoc {
121         if (server == null) {
122             setServer(createTransportServer());
123         }
124         return server;
125     }
126
127     public Broker getBroker() {
128         return broker;
129     }
130
131     public void setBroker(Broker broker) {
132         this.broker = broker;
133         brokerInfo.setBrokerId(broker.getBrokerId());
134         brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
135     }
136     
137     public void setBrokerName(String JavaDoc brokerName) {
138         brokerInfo.setBrokerName(brokerName);
139     }
140
141     public void setServer(TransportServer server) {
142         this.server = server;
143         this.brokerInfo.setBrokerURL(server.getConnectURI().toString());
144         this.server.setAcceptListener(new TransportAcceptListener() {
145             public void onAccept(Transport transport) {
146                 try {
147                     Connection connection = createConnection(transport);
148                     connection.start();
149                 }
150                 catch (Exception JavaDoc e) {
151                     String JavaDoc remoteHost = transport.getRemoteAddress();
152                     ServiceSupport.dispose(transport);
153                     onAcceptError(e, remoteHost);
154                 }
155             }
156
157             public void onAcceptError(Exception JavaDoc error) {
158                 onAcceptError(error,null);
159             }
160
161             private void onAcceptError(Exception JavaDoc error, String JavaDoc remoteHost) {
162                 log.error("Could not accept connection " +
163                     (remoteHost == null ? "" : "from " + remoteHost)
164                     + ": " + error, error);
165             }
166         });
167         this.server.setBrokerInfo(brokerInfo);
168     }
169
170     public URI JavaDoc getUri() {
171         if( uri == null ) {
172             try {
173                 uri = getConnectUri();
174             } catch (Throwable JavaDoc e) {
175             }
176         }
177         return uri;
178     }
179
180     /**
181      * Sets the server transport URI to use if there is not a
182      * {@link TransportServer} configured via the
183      * {@link #setServer(TransportServer)} method. This value is used to lazy
184      * create a {@link TransportServer} instance
185      *
186      * @param uri
187      */

188     public void setUri(URI JavaDoc uri) {
189         this.uri = uri;
190     }
191
192     public TaskRunnerFactory getTaskRunnerFactory() {
193         return taskRunnerFactory;
194     }
195
196     public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
197         this.taskRunnerFactory = taskRunnerFactory;
198     }
199
200     /**
201      * @return the statistics for this connector
202      */

203     public ConnectorStatistics getStatistics() {
204         return statistics;
205     }
206     
207     public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
208         return messageAuthorizationPolicy;
209     }
210
211     /**
212      * Sets the policy used to decide if the current connection is authorized to consume
213      * a given message
214      */

215     public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
216         this.messageAuthorizationPolicy = messageAuthorizationPolicy;
217     }
218
219     public void start() throws Exception JavaDoc {
220         getServer().start();
221         DiscoveryAgent da = getDiscoveryAgent();
222         if( da!=null ) {
223             da.setBrokerName(getBrokerInfo().getBrokerName());
224             da.registerService(getConnectUri().toString());
225             da.start();
226         }
227         if (enableStatusMonitor){
228             this.statusDector = new TransportStatusDetector(this);
229             this.statusDector.start();
230         }
231         
232         log.info("Connector "+getName()+" Started");
233     }
234
235     public void stop() throws Exception JavaDoc {
236         ServiceStopper ss = new ServiceStopper();
237         if( discoveryAgent!=null ) {
238             ss.stop(discoveryAgent);
239         }
240         if (server != null) {
241             ss.stop(server);
242         }
243         if (this.statusDector != null){
244             this.statusDector.stop();
245         }
246         
247         for (Iterator JavaDoc iter = connections.iterator(); iter.hasNext();) {
248             TransportConnection c = (TransportConnection) iter.next();
249             ss.stop(c);
250         }
251         ss.throwFirstException();
252         log.info("Connector "+getName()+" Stopped");
253     }
254
255     // Implementation methods
256
// -------------------------------------------------------------------------
257
protected Connection createConnection(Transport transport) throws IOException JavaDoc {
258         TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory);
259         boolean statEnabled = this.getStatistics().isEnabled();
260         answer.getStatistics().setEnabled(statEnabled);
261         answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
262         return answer;
263     }
264
265     protected TransportServer createTransportServer() throws IOException JavaDoc, URISyntaxException JavaDoc {
266         if (uri == null) {
267             throw new IllegalArgumentException JavaDoc("You must specify either a server or uri property");
268         }
269         if (broker == null) {
270             throw new IllegalArgumentException JavaDoc("You must specify the broker property. Maybe this connector should be added to a broker?");
271         }
272         return TransportFactory.bind(broker.getBrokerId().getValue(),uri);
273     }
274     
275     public DiscoveryAgent getDiscoveryAgent() throws IOException JavaDoc {
276         if( discoveryAgent==null ) {
277             discoveryAgent = createDiscoveryAgent();
278         }
279         return discoveryAgent;
280     }
281
282     protected DiscoveryAgent createDiscoveryAgent() throws IOException JavaDoc {
283         if( discoveryUri!=null ) {
284             return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
285         }
286         return null;
287     }
288
289     public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
290         this.discoveryAgent = discoveryAgent;
291     }
292
293     public URI JavaDoc getDiscoveryUri() {
294         return discoveryUri;
295     }
296
297     public void setDiscoveryUri(URI JavaDoc discoveryUri) {
298         this.discoveryUri = discoveryUri;
299     }
300
301     public URI JavaDoc getConnectUri() throws IOException JavaDoc, URISyntaxException JavaDoc {
302         if( connectUri==null ) {
303             if( server !=null ) {
304                 connectUri = server.getConnectURI();
305             }
306         }
307         return connectUri;
308     }
309
310     public void setConnectUri(URI JavaDoc transportUri) {
311         this.connectUri = transportUri;
312     }
313
314     public void onStarted(TransportConnection connection) {
315         connections.add(connection);
316     }
317
318     public void onStopped(TransportConnection connection) {
319         connections.remove(connection);
320     }
321
322     public String JavaDoc getName(){
323         if( name==null ){
324             uri = getUri();
325             if( uri != null ) {
326                 name = uri.toString();
327             }
328         }
329         return name;
330     }
331     public void setName(String JavaDoc name) {
332         this.name = name;
333     }
334
335     public String JavaDoc toString() {
336         String JavaDoc rc = getName();
337         if( rc == null )
338             rc = super.toString();
339         return rc;
340     }
341
342     public boolean isDisableAsyncDispatch() {
343         return disableAsyncDispatch;
344     }
345
346     public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
347         this.disableAsyncDispatch = disableAsyncDispatch;
348     }
349
350     /**
351      * @return the enableStatusMonitor
352      */

353     public boolean isEnableStatusMonitor(){
354         return enableStatusMonitor;
355     }
356
357     /**
358      * @param enableStatusMonitor the enableStatusMonitor to set
359      */

360     public void setEnableStatusMonitor(boolean enableStatusMonitor){
361         this.enableStatusMonitor=enableStatusMonitor;
362     }
363 }
364
Popular Tags