|                                                                                                              1
 17  package org.apache.servicemix.jbi.nmr;
 18
 19  import java.util.ArrayList
  ; 20  import java.util.List
  ; 21
 22  import javax.jbi.JBIException;
 23  import javax.jbi.component.Component;
 24  import javax.jbi.messaging.MessageExchange;
 25  import javax.jbi.messaging.MessagingException;
 26  import javax.jbi.messaging.MessageExchange.Role;
 27  import javax.jbi.servicedesc.ServiceEndpoint;
 28  import javax.management.JMException
  ; 29  import javax.management.MBeanOperationInfo
  ; 30  import javax.resource.spi.work.WorkManager
  ; 31  import javax.xml.namespace.QName
  ; 32
 33  import org.apache.commons.logging.Log;
 34  import org.apache.commons.logging.LogFactory;
 35  import org.apache.servicemix.jbi.container.ActivationSpec;
 36  import org.apache.servicemix.jbi.container.JBIContainer;
 37  import org.apache.servicemix.jbi.framework.ComponentContextImpl;
 38  import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
 39  import org.apache.servicemix.jbi.framework.ComponentNameSpace;
 40  import org.apache.servicemix.jbi.framework.Registry;
 41  import org.apache.servicemix.jbi.management.BaseSystemService;
 42  import org.apache.servicemix.jbi.management.ManagementContext;
 43  import org.apache.servicemix.jbi.management.OperationInfoHelper;
 44  import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 45  import org.apache.servicemix.jbi.nmr.flow.DefaultFlowChooser;
 46  import org.apache.servicemix.jbi.nmr.flow.Flow;
 47  import org.apache.servicemix.jbi.nmr.flow.FlowChooser;
 48  import org.apache.servicemix.jbi.nmr.flow.FlowProvider;
 49  import org.apache.servicemix.jbi.resolver.ConsumerComponentEndpointFilter;
 50  import org.apache.servicemix.jbi.resolver.EndpointChooser;
 51  import org.apache.servicemix.jbi.resolver.EndpointFilter;
 52  import org.apache.servicemix.jbi.resolver.EndpointResolver;
 53  import org.apache.servicemix.jbi.resolver.FirstChoicePolicy;
 54  import org.apache.servicemix.jbi.resolver.ProducerComponentEndpointFilter;
 55  import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
 56  import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
 57  import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
 58  import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint;
 59
 60
 65  public class DefaultBroker extends BaseSystemService implements Broker {
 66
 67      private Registry registry;
 68      private String
  flowNames = "seda"; 69      private String
  subscriptionFlowName = null; 70      private WorkManager
  workManager; 71      private Flow[] flows;
 72      private final static Log log = LogFactory.getLog(DefaultBroker.class);
 73      private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
 74      private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
 75      private SubscriptionManager subscriptionManager = new SubscriptionManager();
 76      private FlowChooser defaultFlowChooser = new DefaultFlowChooser();
 77
 78
 81      public DefaultBroker() {
 82      }
 83
 84
 89      public String
  getDescription() { 90          return "Normalized Message Router";
 91      }
 92
 93
 96      public WorkManager
  getWorkManager() { 97          return workManager;
 98      }
 99
 100
 104     public void setWorkManager(WorkManager
  workManager) { 105         this.workManager = workManager;
 106     }
 107
 108     public SubscriptionManager getSubscriptionManager() {
 109         return subscriptionManager;
 110     }
 111
 112
 115     public void setSubscriptionManager(SubscriptionManager subscriptionManager) {
 116         this.subscriptionManager = subscriptionManager;
 117     }
 118
 119
 125     public void init(JBIContainer container) throws JBIException {
 126         super.init(container);
 127         this.workManager = container.getWorkManager();
 128         this.registry = container.getRegistry();
 129                 if (this.flows == null) {
 131             String
  [] names = flowNames.split(","); 132             flows = new Flow[names.length];
 133             for (int i = 0; i < names.length; i++) {
 134                 flows[i] = FlowProvider.getFlow(names[i]);
 135                 flows[i].init(this);
 136             }
 137         } else {
 138             for (int i = 0; i < flows.length; i++) {
 139                 flows[i].init(this);
 140             }
 141         }
 142         subscriptionManager.init(this, registry);
 143     }
 144
 145     protected Class
  getServiceMBean() { 146         return BrokerMBean.class;
 147     }
 148
 149
 154     public String
  getContainerName() { 155         return container.getName();
 156     }
 157
 158
 163     public ManagementContext getManagementContext() {
 164         return container.getManagementContext();
 165     }
 166
 167
 172     public Registry getRegistry() {
 173         return registry;
 174     }
 175
 176
 181     public void start() throws JBIException {
 182         for (int i = 0; i < flows.length; i++) {
 183             flows[i].start();
 184         }
 185         super.start();
 186     }
 187
 188
 193     public void stop() throws JBIException {
 194         for (int i = 0; i < flows.length; i++) {
 195             flows[i].stop();
 196         }
 197         super.stop();
 198     }
 199
 200
 205     public void shutDown() throws JBIException {
 206         stop();
 207         for (int i = 0; i < flows.length; i++) {
 208             flows[i].shutDown();
 209         }
 210         super.shutDown();
 211         container.getManagementContext().unregisterMBean(this);
 212     }
 213
 214
 217     public String
  getFlowNames() { 218         return flowNames;
 219     }
 220
 221
 225     public void setFlowNames(String
  flowNames) { 226         this.flowNames = flowNames;
 227     }
 228
 229
 232     public String
  getSubscriptionFlowName() { 233         return subscriptionFlowName;
 234     }
 235
 236
 240     public void setSubscriptionFlowName(String
  subscriptionFlowName) { 241         this.subscriptionFlowName = subscriptionFlowName;
 242     }
 243
 244
 249     public void setFlows(Flow[] flows) {
 250         this.flows = flows;
 251     }
 252
 253
 256     public Flow[] getFlows() {
 257         return this.flows;
 258     }
 259
 260
 263     public void suspend() {
 264         for (int i = 0; i < flows.length; i++) {
 265             flows[i].suspend();
 266         }
 267     }
 268
 269
 272     public void resume() {
 273         for (int i = 0; i < flows.length; i++) {
 274             flows[i].resume();
 275         }
 276     }
 277
 278
 284     public void sendExchangePacket(MessageExchange me) throws JBIException {
 285         MessageExchangeImpl exchange = (MessageExchangeImpl) me;
 286         if (exchange.getRole() == Role.PROVIDER && exchange.getDestinationId() == null) {
 287             resolveAddress(exchange);
 288         }
 289
 290         boolean foundRoute = false;
 291                 if (exchange.getEndpoint() != null || exchange.getRole() == Role.CONSUMER) {
 293             foundRoute = true;
 294             Flow flow = defaultFlowChooser.chooseFlow(flows, exchange);
 295             if (flow == null) {
 296                 throw new MessagingException("Unable to choose a flow for exchange: " + exchange);
 297             }
 298             flow.send(exchange);
 299         }
 300
 301         if (exchange.getRole() == Role.PROVIDER) {
 302             getSubscriptionManager().dispatchToSubscribers(exchange);
 303         }
 304
 305         if (!foundRoute) {
 306             boolean throwException = true;
 307             ActivationSpec activationSpec = exchange.getActivationSpec();
 308             if (activationSpec != null) {
 309                 throwException = activationSpec.isFailIfNoDestinationEndpoint();
 310             }
 311             if (throwException) {
 312                 throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + exchange.getService() + " and interface: "
 313                         + exchange.getInterfaceName());
 314             } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
 315                 exchange.handleAccept();
 316                 ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager().getContext();
 317                 exchange.setDestinationId(ctx.getComponentNameSpace());
 318                                 getSubscriptionManager().done(exchange);
 320             }
 321         }
 322     }
 323
 324     protected void resolveAddress(MessageExchangeImpl exchange) throws JBIException {
 325         ServiceEndpoint theEndpoint = exchange.getEndpoint();
 326         if (theEndpoint != null) {
 327             if (theEndpoint instanceof ExternalEndpoint) {
 328                 throw new JBIException("External endpoints can not be used for routing: should be an internal or dynamic endpoint.");
 329             }
 330             if (theEndpoint instanceof AbstractServiceEndpoint == false) {
 331                 throw new JBIException("Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
 332             }
 333         }
 334                 if (theEndpoint instanceof LinkedEndpoint) {
 336             QName
  svcName = ((LinkedEndpoint) theEndpoint).getToService(); 337             String
  epName = ((LinkedEndpoint) theEndpoint).getToEndpoint(); 338             ServiceEndpoint ep = registry.getInternalEndpoint(svcName, epName);
 339             if (ep == null) {
 340                 throw new JBIException("Could not resolve linked endpoint: " + theEndpoint);
 341             }
 342             theEndpoint = ep;
 343         }
 344
 345                 ComponentContextImpl context = exchange.getSourceContext();
 347         if (theEndpoint == null) {
 348             QName
  serviceName = exchange.getService(); 349             QName
  interfaceName = exchange.getInterfaceName(); 350
 351                                     if (serviceName != null) {
 354                 ServiceEndpoint[] endpoints = registry.getEndpointsForService(serviceName);
 355                 endpoints = getMatchingEndpoints(endpoints, exchange);
 356                 theEndpoint = getServiceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
 357                 if (theEndpoint == null) {
 358                     log.warn("ServiceName (" + serviceName + ") specified for routing, but can't find it registered");
 359                 }
 360             }
 361             if (theEndpoint == null && interfaceName != null) {
 362                 ServiceEndpoint[] endpoints = registry.getEndpointsForInterface(interfaceName);
 363                 endpoints = getMatchingEndpoints(endpoints, exchange);
 364                 theEndpoint = (InternalEndpoint) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
 365                 if (theEndpoint == null) {
 366                     log.warn("InterfaceName (" + interfaceName + ") specified for routing, but can't find any matching components");
 367                 }
 368             }
 369             if (theEndpoint == null) {
 370                                                 ActivationSpec activationSpec = exchange.getActivationSpec();
 373                 if (activationSpec != null) {
 374                     EndpointResolver destinationResolver = activationSpec.getDestinationResolver();
 375                     if (destinationResolver != null) {
 376                         try {
 377                             EndpointFilter filter = createEndpointFilter(context, exchange);
 378                             theEndpoint = (InternalEndpoint) destinationResolver.resolveEndpoint(context, exchange, filter);
 379                         }
 380                         catch (JBIException e) {
 381                             throw new MessagingException("Failed to resolve endpoint: " + e, e);
 382                         }
 383                     }
 384                 }
 385             }
 386         }
 387         if (theEndpoint != null) {
 388             exchange.setEndpoint(theEndpoint);
 389         }
 390         if (log.isTraceEnabled()) {
 391             log.trace("Routing exchange " + exchange + " to: " + theEndpoint);
 392         }
 393     }
 394
 395
 403     protected ServiceEndpoint[] getMatchingEndpoints(ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) {
 404         List
  filtered = new ArrayList  (); 405         ComponentMBeanImpl consumer = getRegistry().getComponent(exchange.getSourceId());
 406
 407         for (int i = 0; i < endpoints.length; i++) {
 408             ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace();
 409             if (id != null) {
 410                 ComponentMBeanImpl provider = getRegistry().getComponent(id);
 411                 if (provider != null) {
 412                     if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange) ||
 413                         !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
 414                         continue;
 415                     }
 416                 }
 417             }
 418             filtered.add(endpoints[i]);
 419         }
 420         return (ServiceEndpoint[]) filtered.toArray(new ServiceEndpoint[filtered.size()]);
 421     }
 422
 423
 426     public EndpointChooser getDefaultInterfaceChooser() {
 427         return defaultInterfaceChooser;
 428     }
 429
 430
 435     public void setDefaultInterfaceChooser(EndpointChooser defaultInterfaceChooser) {
 436         this.defaultInterfaceChooser = defaultInterfaceChooser;
 437     }
 438
 439
 442     public EndpointChooser getDefaultServiceChooser() {
 443         return defaultServiceChooser;
 444     }
 445
 446
 451     public void setDefaultServiceChooser(EndpointChooser defaultServiceChooser) {
 452         this.defaultServiceChooser = defaultServiceChooser;
 453     }
 454
 455
 458     public FlowChooser getDefaultFlowChooser() {
 459         return defaultFlowChooser;
 460     }
 461
 462
 465     public void setDefaultFlowChooser(FlowChooser defaultFlowChooser) {
 466         this.defaultFlowChooser = defaultFlowChooser;
 467     }
 468
 469
 477     protected EndpointChooser getServiceChooser(MessageExchangeImpl exchange) {
 478         EndpointChooser chooser = null;
 479         ActivationSpec activationSpec = exchange.getActivationSpec();
 480         if (activationSpec != null) {
 481             chooser = activationSpec.getServiceChooser();
 482         }
 483         if (chooser == null) {
 484             chooser = defaultServiceChooser;
 485         }
 486         return chooser;
 487     }
 488
 489
 497     protected EndpointChooser getInterfaceChooser(MessageExchangeImpl exchange) {
 498         EndpointChooser chooser = null;
 499         ActivationSpec activationSpec = exchange.getActivationSpec();
 500         if (activationSpec != null) {
 501             chooser = activationSpec.getInterfaceChooser();
 502         }
 503         if (chooser == null) {
 504             chooser = defaultInterfaceChooser;
 505         }
 506         return chooser;
 507     }
 508
 509
 517     protected EndpointFilter createEndpointFilter(ComponentContextImpl context, MessageExchangeImpl exchange) {
 518         Component component = context.getComponent();
 519         if (exchange.getRole() == Role.PROVIDER) {
 520             return new ConsumerComponentEndpointFilter(component);
 521         }
 522         else {
 523             return new ProducerComponentEndpointFilter(component);
 524         }
 525     }
 526
 527
 533     public MBeanOperationInfo
  [] getOperationInfos() throws JMException  { 534         OperationInfoHelper helper = new OperationInfoHelper();
 535         helper.addOperation(getObjectToManage(), "suspend", "suspend the NMR processing");
 536         helper.addOperation(getObjectToManage(), "resume", "resume the NMR processing");
 537
 538         return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
 539     }
 540
 541     public JBIContainer getContainer() {
 542         return container;
 543     }
 544
 545 }
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |