KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > DefaultBroker


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr;
18
19 import java.util.ArrayList JavaDoc;
20 import java.util.List JavaDoc;
21
22 import javax.jbi.JBIException;
23 import javax.jbi.component.Component;
24 import javax.jbi.messaging.MessageExchange;
25 import javax.jbi.messaging.MessagingException;
26 import javax.jbi.messaging.MessageExchange.Role;
27 import javax.jbi.servicedesc.ServiceEndpoint;
28 import javax.management.JMException JavaDoc;
29 import javax.management.MBeanOperationInfo JavaDoc;
30 import javax.resource.spi.work.WorkManager JavaDoc;
31 import javax.xml.namespace.QName JavaDoc;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.servicemix.jbi.container.ActivationSpec;
36 import org.apache.servicemix.jbi.container.JBIContainer;
37 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
38 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
39 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
40 import org.apache.servicemix.jbi.framework.Registry;
41 import org.apache.servicemix.jbi.management.BaseSystemService;
42 import org.apache.servicemix.jbi.management.ManagementContext;
43 import org.apache.servicemix.jbi.management.OperationInfoHelper;
44 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
45 import org.apache.servicemix.jbi.nmr.flow.DefaultFlowChooser;
46 import org.apache.servicemix.jbi.nmr.flow.Flow;
47 import org.apache.servicemix.jbi.nmr.flow.FlowChooser;
48 import org.apache.servicemix.jbi.nmr.flow.FlowProvider;
49 import org.apache.servicemix.jbi.resolver.ConsumerComponentEndpointFilter;
50 import org.apache.servicemix.jbi.resolver.EndpointChooser;
51 import org.apache.servicemix.jbi.resolver.EndpointFilter;
52 import org.apache.servicemix.jbi.resolver.EndpointResolver;
53 import org.apache.servicemix.jbi.resolver.FirstChoicePolicy;
54 import org.apache.servicemix.jbi.resolver.ProducerComponentEndpointFilter;
55 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
56 import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
57 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
58 import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint;
59
60 /**
61  * The Broker handles Nomalised Message Routing within ServiceMix
62  *
63  * @version $Revision: 384328 $
64  */

65 public class DefaultBroker extends BaseSystemService implements Broker {
66
67     private Registry registry;
68     private String JavaDoc flowNames = "seda";
69     private String JavaDoc subscriptionFlowName = null;
70     private WorkManager JavaDoc workManager;
71     private Flow[] flows;
72     private final static Log log = LogFactory.getLog(DefaultBroker.class);
73     private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
74     private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
75     private SubscriptionManager subscriptionManager = new SubscriptionManager();
76     private FlowChooser defaultFlowChooser = new DefaultFlowChooser();
77
78     /**
79      * Constructor
80      */

81     public DefaultBroker() {
82     }
83
84     /**
85      * Get the description
86      *
87      * @return description
88      */

89     public String JavaDoc getDescription() {
90         return "Normalized Message Router";
91     }
92
93     /**
94      * @return Returns the workManager.
95      */

96     public WorkManager JavaDoc getWorkManager() {
97         return workManager;
98     }
99
100     /**
101      * @param workManager
102      * The workManager to set.
103      */

104     public void setWorkManager(WorkManager JavaDoc workManager) {
105         this.workManager = workManager;
106     }
107
108     public SubscriptionManager getSubscriptionManager() {
109         return subscriptionManager;
110     }
111
112     /**
113      * Sets the subscription manager
114      */

115     public void setSubscriptionManager(SubscriptionManager subscriptionManager) {
116         this.subscriptionManager = subscriptionManager;
117     }
118
119     /**
120      * initialize the broker
121      *
122      * @param container
123      * @throws JBIException
124      */

125     public void init(JBIContainer container) throws JBIException {
126         super.init(container);
127         this.workManager = container.getWorkManager();
128         this.registry = container.getRegistry();
129         // Create and initialize flows
130
if (this.flows == null) {
131             String JavaDoc[] names = flowNames.split(",");
132             flows = new Flow[names.length];
133             for (int i = 0; i < names.length; i++) {
134                 flows[i] = FlowProvider.getFlow(names[i]);
135                 flows[i].init(this);
136             }
137         } else {
138             for (int i = 0; i < flows.length; i++) {
139                 flows[i].init(this);
140             }
141         }
142         subscriptionManager.init(this, registry);
143     }
144     
145     protected Class JavaDoc getServiceMBean() {
146         return BrokerMBean.class;
147     }
148
149     /**
150      * Get the name of the Container
151      *
152      * @return containerName
153      */

154     public String JavaDoc getContainerName() {
155         return container.getName();
156     }
157
158     /**
159      * Get the ManagementContext
160      *
161      * @return the managementContext
162      */

163     public ManagementContext getManagementContext() {
164         return container.getManagementContext();
165     }
166
167     /**
168      * Get the Registry
169      *
170      * @return the registry
171      */

172     public Registry getRegistry() {
173         return registry;
174     }
175
176     /**
177      * start brokering
178      *
179      * @throws JBIException
180      */

181     public void start() throws JBIException {
182         for (int i = 0; i < flows.length; i++) {
183             flows[i].start();
184         }
185         super.start();
186     }
187
188     /**
189      * stop brokering
190      *
191      * @throws JBIException
192      */

193     public void stop() throws JBIException {
194         for (int i = 0; i < flows.length; i++) {
195             flows[i].stop();
196         }
197         super.stop();
198     }
199
200     /**
201      * shutdown all Components
202      *
203      * @throws JBIException
204      */

205     public void shutDown() throws JBIException {
206         stop();
207         for (int i = 0; i < flows.length; i++) {
208             flows[i].shutDown();
209         }
210         super.shutDown();
211         container.getManagementContext().unregisterMBean(this);
212     }
213
214     /**
215      * @return Returns the flow.
216      */

217     public String JavaDoc getFlowNames() {
218         return flowNames;
219     }
220
221     /**
222      * @param flowName
223      * The flow to set.
224      */

225     public void setFlowNames(String JavaDoc flowNames) {
226         this.flowNames = flowNames;
227     }
228
229     /**
230      * @return the subscriptionFlowName
231      */

232     public String JavaDoc getSubscriptionFlowName() {
233         return subscriptionFlowName;
234     }
235
236     /**
237      * Set the subscription flow name
238      * @param subscriptionFlowName
239      */

240     public void setSubscriptionFlowName(String JavaDoc subscriptionFlowName) {
241         this.subscriptionFlowName = subscriptionFlowName;
242     }
243
244     /**
245      * Set the flow
246      *
247      * @param flow
248      */

249     public void setFlows(Flow[] flows) {
250         this.flows = flows;
251     }
252
253     /**
254      * @return the Flow
255      */

256     public Flow[] getFlows() {
257         return this.flows;
258     }
259
260     /**
261      * suspend the flow to prevent any message exchanges
262      */

263     public void suspend() {
264         for (int i = 0; i < flows.length; i++) {
265             flows[i].suspend();
266         }
267     }
268
269     /**
270      * resume message exchange processing
271      */

272     public void resume() {
273         for (int i = 0; i < flows.length; i++) {
274             flows[i].resume();
275         }
276     }
277
278     /**
279      * Route an ExchangePacket to a destination
280      *
281      * @param exchange
282      * @throws JBIException
283      */

284     public void sendExchangePacket(MessageExchange me) throws JBIException {
285         MessageExchangeImpl exchange = (MessageExchangeImpl) me;
286         if (exchange.getRole() == Role.PROVIDER && exchange.getDestinationId() == null) {
287             resolveAddress(exchange);
288         }
289
290         boolean foundRoute = false;
291         // If we found a destination, or this is a reply
292
if (exchange.getEndpoint() != null || exchange.getRole() == Role.CONSUMER) {
293             foundRoute = true;
294             Flow flow = defaultFlowChooser.chooseFlow(flows, exchange);
295             if (flow == null) {
296                 throw new MessagingException("Unable to choose a flow for exchange: " + exchange);
297             }
298             flow.send(exchange);
299         }
300
301         if (exchange.getRole() == Role.PROVIDER) {
302             getSubscriptionManager().dispatchToSubscribers(exchange);
303         }
304         
305         if (!foundRoute) {
306             boolean throwException = true;
307             ActivationSpec activationSpec = exchange.getActivationSpec();
308             if (activationSpec != null) {
309                 throwException = activationSpec.isFailIfNoDestinationEndpoint();
310             }
311             if (throwException) {
312                 throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + exchange.getService() + " and interface: "
313                         + exchange.getInterfaceName());
314             } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
315                 exchange.handleAccept();
316                 ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager().getContext();
317                 exchange.setDestinationId(ctx.getComponentNameSpace());
318                 // TODO: this will fail if exchange is InOut
319
getSubscriptionManager().done(exchange);
320             }
321         }
322     }
323     
324     protected void resolveAddress(MessageExchangeImpl exchange) throws JBIException {
325         ServiceEndpoint theEndpoint = exchange.getEndpoint();
326         if (theEndpoint != null) {
327             if (theEndpoint instanceof ExternalEndpoint) {
328                 throw new JBIException("External endpoints can not be used for routing: should be an internal or dynamic endpoint.");
329             }
330             if (theEndpoint instanceof AbstractServiceEndpoint == false) {
331                 throw new JBIException("Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
332             }
333         }
334         // Resolve linked endpoints
335
if (theEndpoint instanceof LinkedEndpoint) {
336             QName JavaDoc svcName = ((LinkedEndpoint) theEndpoint).getToService();
337             String JavaDoc epName = ((LinkedEndpoint) theEndpoint).getToEndpoint();
338             ServiceEndpoint ep = registry.getInternalEndpoint(svcName, epName);
339             if (ep == null) {
340                 throw new JBIException("Could not resolve linked endpoint: " + theEndpoint);
341             }
342             theEndpoint = ep;
343         }
344
345         // get the context which created the exchange
346
ComponentContextImpl context = exchange.getSourceContext();
347         if (theEndpoint == null) {
348             QName JavaDoc serviceName = exchange.getService();
349             QName JavaDoc interfaceName = exchange.getInterfaceName();
350             
351             // check in order, ServiceName then InterfaceName
352
// check to see if there is a match on the serviceName
353
if (serviceName != null) {
354                 ServiceEndpoint[] endpoints = registry.getEndpointsForService(serviceName);
355                 endpoints = getMatchingEndpoints(endpoints, exchange);
356                 theEndpoint = getServiceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
357                 if (theEndpoint == null) {
358                     log.warn("ServiceName (" + serviceName + ") specified for routing, but can't find it registered");
359                 }
360             }
361             if (theEndpoint == null && interfaceName != null) {
362                 ServiceEndpoint[] endpoints = registry.getEndpointsForInterface(interfaceName);
363                 endpoints = getMatchingEndpoints(endpoints, exchange);
364                 theEndpoint = (InternalEndpoint) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
365                 if (theEndpoint == null) {
366                     log.warn("InterfaceName (" + interfaceName + ") specified for routing, but can't find any matching components");
367                 }
368             }
369             if (theEndpoint == null) {
370                 // lets use the resolver on the activation spec if
371
// applicable
372
ActivationSpec activationSpec = exchange.getActivationSpec();
373                 if (activationSpec != null) {
374                     EndpointResolver destinationResolver = activationSpec.getDestinationResolver();
375                     if (destinationResolver != null) {
376                         try {
377                             EndpointFilter filter = createEndpointFilter(context, exchange);
378                             theEndpoint = (InternalEndpoint) destinationResolver.resolveEndpoint(context, exchange, filter);
379                         }
380                         catch (JBIException e) {
381                             throw new MessagingException("Failed to resolve endpoint: " + e, e);
382                         }
383                     }
384                 }
385             }
386         }
387         if (theEndpoint != null) {
388             exchange.setEndpoint(theEndpoint);
389         }
390         if (log.isTraceEnabled()) {
391             log.trace("Routing exchange " + exchange + " to: " + theEndpoint);
392         }
393     }
394
395     /**
396      * Filter the given endpoints by asking to the provider and consumer
397      * if they are both ok to process the exchange.
398      *
399      * @param endpoints an array of internal endpoints to check
400      * @param exchange the exchange that will be serviced
401      * @return an array of endpoints on which both consumer and provider agrees
402      */

403     protected ServiceEndpoint[] getMatchingEndpoints(ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) {
404         List JavaDoc filtered = new ArrayList JavaDoc();
405         ComponentMBeanImpl consumer = getRegistry().getComponent(exchange.getSourceId());
406         
407         for (int i = 0; i < endpoints.length; i++) {
408             ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace();
409             if (id != null) {
410                 ComponentMBeanImpl provider = getRegistry().getComponent(id);
411                 if (provider != null) {
412                     if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange) ||
413                         !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
414                         continue;
415                     }
416                 }
417             }
418             filtered.add(endpoints[i]);
419         }
420         return (ServiceEndpoint[]) filtered.toArray(new ServiceEndpoint[filtered.size()]);
421     }
422
423     /**
424      * @return the default EndpointChooser
425      */

426     public EndpointChooser getDefaultInterfaceChooser() {
427         return defaultInterfaceChooser;
428     }
429
430     /**
431      * Set the default EndpointChooser
432      *
433      * @param defaultInterfaceChooser
434      */

435     public void setDefaultInterfaceChooser(EndpointChooser defaultInterfaceChooser) {
436         this.defaultInterfaceChooser = defaultInterfaceChooser;
437     }
438
439     /**
440      * @return the default EndpointChooser
441      */

442     public EndpointChooser getDefaultServiceChooser() {
443         return defaultServiceChooser;
444     }
445
446     /**
447      * Set default EndpointChooser
448      *
449      * @param defaultServiceChooser
450      */

451     public void setDefaultServiceChooser(EndpointChooser defaultServiceChooser) {
452         this.defaultServiceChooser = defaultServiceChooser;
453     }
454
455     /**
456      * @return the defaultFlowChooser
457      */

458     public FlowChooser getDefaultFlowChooser() {
459         return defaultFlowChooser;
460     }
461
462     /**
463      * @param defaultFlowChooser the defaultFlowChooser to set
464      */

465     public void setDefaultFlowChooser(FlowChooser defaultFlowChooser) {
466         this.defaultFlowChooser = defaultFlowChooser;
467     }
468
469     /**
470      * Returns the endpoint chooser for endpoints found by service which will
471      * use the chooser on the exchange's activation spec if available otherwise
472      * will use the default
473      *
474      * @param exchange
475      * @return the EndpointChooser
476      */

477     protected EndpointChooser getServiceChooser(MessageExchangeImpl exchange) {
478         EndpointChooser chooser = null;
479         ActivationSpec activationSpec = exchange.getActivationSpec();
480         if (activationSpec != null) {
481             chooser = activationSpec.getServiceChooser();
482         }
483         if (chooser == null) {
484             chooser = defaultServiceChooser;
485         }
486         return chooser;
487     }
488
489     /**
490      * Returns the endpoint chooser for endpoints found by service which will
491      * use the chooser on the exchange's activation spec if available otherwise
492      * will use the default
493      *
494      * @param exchange
495      * @return the EndpointChooser
496      */

497     protected EndpointChooser getInterfaceChooser(MessageExchangeImpl exchange) {
498         EndpointChooser chooser = null;
499         ActivationSpec activationSpec = exchange.getActivationSpec();
500         if (activationSpec != null) {
501             chooser = activationSpec.getInterfaceChooser();
502         }
503         if (chooser == null) {
504             chooser = defaultInterfaceChooser;
505         }
506         return chooser;
507     }
508
509     /**
510      * Factory method to create an endpoint filter for the given component
511      * context and message exchange
512      *
513      * @param context
514      * @param exchange
515      * @return the EndpointFilter
516      */

517     protected EndpointFilter createEndpointFilter(ComponentContextImpl context, MessageExchangeImpl exchange) {
518         Component component = context.getComponent();
519         if (exchange.getRole() == Role.PROVIDER) {
520             return new ConsumerComponentEndpointFilter(component);
521         }
522         else {
523             return new ProducerComponentEndpointFilter(component);
524         }
525     }
526
527     /**
528      * Get an array of MBeanOperationInfo
529      *
530      * @return array of OperationInfos
531      * @throws JMException
532      */

533     public MBeanOperationInfo JavaDoc[] getOperationInfos() throws JMException JavaDoc {
534         OperationInfoHelper helper = new OperationInfoHelper();
535         helper.addOperation(getObjectToManage(), "suspend", "suspend the NMR processing");
536         helper.addOperation(getObjectToManage(), "resume", "resume the NMR processing");
537
538         return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
539     }
540
541     public JBIContainer getContainer() {
542         return container;
543     }
544
545 }
Popular Tags