1 17 package org.apache.servicemix.jbi.nmr; 18 19 import java.util.ArrayList ; 20 import java.util.List ; 21 22 import javax.jbi.JBIException; 23 import javax.jbi.component.Component; 24 import javax.jbi.messaging.MessageExchange; 25 import javax.jbi.messaging.MessagingException; 26 import javax.jbi.messaging.MessageExchange.Role; 27 import javax.jbi.servicedesc.ServiceEndpoint; 28 import javax.management.JMException ; 29 import javax.management.MBeanOperationInfo ; 30 import javax.resource.spi.work.WorkManager ; 31 import javax.xml.namespace.QName ; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.servicemix.jbi.container.ActivationSpec; 36 import org.apache.servicemix.jbi.container.JBIContainer; 37 import org.apache.servicemix.jbi.framework.ComponentContextImpl; 38 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl; 39 import org.apache.servicemix.jbi.framework.ComponentNameSpace; 40 import org.apache.servicemix.jbi.framework.Registry; 41 import org.apache.servicemix.jbi.management.BaseSystemService; 42 import org.apache.servicemix.jbi.management.ManagementContext; 43 import org.apache.servicemix.jbi.management.OperationInfoHelper; 44 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 45 import org.apache.servicemix.jbi.nmr.flow.DefaultFlowChooser; 46 import org.apache.servicemix.jbi.nmr.flow.Flow; 47 import org.apache.servicemix.jbi.nmr.flow.FlowChooser; 48 import org.apache.servicemix.jbi.nmr.flow.FlowProvider; 49 import org.apache.servicemix.jbi.resolver.ConsumerComponentEndpointFilter; 50 import org.apache.servicemix.jbi.resolver.EndpointChooser; 51 import org.apache.servicemix.jbi.resolver.EndpointFilter; 52 import org.apache.servicemix.jbi.resolver.EndpointResolver; 53 import org.apache.servicemix.jbi.resolver.FirstChoicePolicy; 54 import org.apache.servicemix.jbi.resolver.ProducerComponentEndpointFilter; 55 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint; 56 import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint; 57 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint; 58 import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint; 59 60 65 public class DefaultBroker extends BaseSystemService implements Broker { 66 67 private Registry registry; 68 private String flowNames = "seda"; 69 private String subscriptionFlowName = null; 70 private WorkManager workManager; 71 private Flow[] flows; 72 private final static Log log = LogFactory.getLog(DefaultBroker.class); 73 private EndpointChooser defaultServiceChooser = new FirstChoicePolicy(); 74 private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy(); 75 private SubscriptionManager subscriptionManager = new SubscriptionManager(); 76 private FlowChooser defaultFlowChooser = new DefaultFlowChooser(); 77 78 81 public DefaultBroker() { 82 } 83 84 89 public String getDescription() { 90 return "Normalized Message Router"; 91 } 92 93 96 public WorkManager getWorkManager() { 97 return workManager; 98 } 99 100 104 public void setWorkManager(WorkManager workManager) { 105 this.workManager = workManager; 106 } 107 108 public SubscriptionManager getSubscriptionManager() { 109 return subscriptionManager; 110 } 111 112 115 public void setSubscriptionManager(SubscriptionManager subscriptionManager) { 116 this.subscriptionManager = subscriptionManager; 117 } 118 119 125 public void init(JBIContainer container) throws JBIException { 126 super.init(container); 127 this.workManager = container.getWorkManager(); 128 this.registry = container.getRegistry(); 129 if (this.flows == null) { 131 String [] names = flowNames.split(","); 132 flows = new Flow[names.length]; 133 for (int i = 0; i < names.length; i++) { 134 flows[i] = FlowProvider.getFlow(names[i]); 135 flows[i].init(this); 136 } 137 } else { 138 for (int i = 0; i < flows.length; i++) { 139 flows[i].init(this); 140 } 141 } 142 subscriptionManager.init(this, registry); 143 } 144 145 protected Class getServiceMBean() { 146 return BrokerMBean.class; 147 } 148 149 154 public String getContainerName() { 155 return container.getName(); 156 } 157 158 163 public ManagementContext getManagementContext() { 164 return container.getManagementContext(); 165 } 166 167 172 public Registry getRegistry() { 173 return registry; 174 } 175 176 181 public void start() throws JBIException { 182 for (int i = 0; i < flows.length; i++) { 183 flows[i].start(); 184 } 185 super.start(); 186 } 187 188 193 public void stop() throws JBIException { 194 for (int i = 0; i < flows.length; i++) { 195 flows[i].stop(); 196 } 197 super.stop(); 198 } 199 200 205 public void shutDown() throws JBIException { 206 stop(); 207 for (int i = 0; i < flows.length; i++) { 208 flows[i].shutDown(); 209 } 210 super.shutDown(); 211 container.getManagementContext().unregisterMBean(this); 212 } 213 214 217 public String getFlowNames() { 218 return flowNames; 219 } 220 221 225 public void setFlowNames(String flowNames) { 226 this.flowNames = flowNames; 227 } 228 229 232 public String getSubscriptionFlowName() { 233 return subscriptionFlowName; 234 } 235 236 240 public void setSubscriptionFlowName(String subscriptionFlowName) { 241 this.subscriptionFlowName = subscriptionFlowName; 242 } 243 244 249 public void setFlows(Flow[] flows) { 250 this.flows = flows; 251 } 252 253 256 public Flow[] getFlows() { 257 return this.flows; 258 } 259 260 263 public void suspend() { 264 for (int i = 0; i < flows.length; i++) { 265 flows[i].suspend(); 266 } 267 } 268 269 272 public void resume() { 273 for (int i = 0; i < flows.length; i++) { 274 flows[i].resume(); 275 } 276 } 277 278 284 public void sendExchangePacket(MessageExchange me) throws JBIException { 285 MessageExchangeImpl exchange = (MessageExchangeImpl) me; 286 if (exchange.getRole() == Role.PROVIDER && exchange.getDestinationId() == null) { 287 resolveAddress(exchange); 288 } 289 290 boolean foundRoute = false; 291 if (exchange.getEndpoint() != null || exchange.getRole() == Role.CONSUMER) { 293 foundRoute = true; 294 Flow flow = defaultFlowChooser.chooseFlow(flows, exchange); 295 if (flow == null) { 296 throw new MessagingException("Unable to choose a flow for exchange: " + exchange); 297 } 298 flow.send(exchange); 299 } 300 301 if (exchange.getRole() == Role.PROVIDER) { 302 getSubscriptionManager().dispatchToSubscribers(exchange); 303 } 304 305 if (!foundRoute) { 306 boolean throwException = true; 307 ActivationSpec activationSpec = exchange.getActivationSpec(); 308 if (activationSpec != null) { 309 throwException = activationSpec.isFailIfNoDestinationEndpoint(); 310 } 311 if (throwException) { 312 throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + exchange.getService() + " and interface: " 313 + exchange.getInterfaceName()); 314 } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) { 315 exchange.handleAccept(); 316 ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager().getContext(); 317 exchange.setDestinationId(ctx.getComponentNameSpace()); 318 getSubscriptionManager().done(exchange); 320 } 321 } 322 } 323 324 protected void resolveAddress(MessageExchangeImpl exchange) throws JBIException { 325 ServiceEndpoint theEndpoint = exchange.getEndpoint(); 326 if (theEndpoint != null) { 327 if (theEndpoint instanceof ExternalEndpoint) { 328 throw new JBIException("External endpoints can not be used for routing: should be an internal or dynamic endpoint."); 329 } 330 if (theEndpoint instanceof AbstractServiceEndpoint == false) { 331 throw new JBIException("Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint."); 332 } 333 } 334 if (theEndpoint instanceof LinkedEndpoint) { 336 QName svcName = ((LinkedEndpoint) theEndpoint).getToService(); 337 String epName = ((LinkedEndpoint) theEndpoint).getToEndpoint(); 338 ServiceEndpoint ep = registry.getInternalEndpoint(svcName, epName); 339 if (ep == null) { 340 throw new JBIException("Could not resolve linked endpoint: " + theEndpoint); 341 } 342 theEndpoint = ep; 343 } 344 345 ComponentContextImpl context = exchange.getSourceContext(); 347 if (theEndpoint == null) { 348 QName serviceName = exchange.getService(); 349 QName interfaceName = exchange.getInterfaceName(); 350 351 if (serviceName != null) { 354 ServiceEndpoint[] endpoints = registry.getEndpointsForService(serviceName); 355 endpoints = getMatchingEndpoints(endpoints, exchange); 356 theEndpoint = getServiceChooser(exchange).chooseEndpoint(endpoints, context, exchange); 357 if (theEndpoint == null) { 358 log.warn("ServiceName (" + serviceName + ") specified for routing, but can't find it registered"); 359 } 360 } 361 if (theEndpoint == null && interfaceName != null) { 362 ServiceEndpoint[] endpoints = registry.getEndpointsForInterface(interfaceName); 363 endpoints = getMatchingEndpoints(endpoints, exchange); 364 theEndpoint = (InternalEndpoint) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange); 365 if (theEndpoint == null) { 366 log.warn("InterfaceName (" + interfaceName + ") specified for routing, but can't find any matching components"); 367 } 368 } 369 if (theEndpoint == null) { 370 ActivationSpec activationSpec = exchange.getActivationSpec(); 373 if (activationSpec != null) { 374 EndpointResolver destinationResolver = activationSpec.getDestinationResolver(); 375 if (destinationResolver != null) { 376 try { 377 EndpointFilter filter = createEndpointFilter(context, exchange); 378 theEndpoint = (InternalEndpoint) destinationResolver.resolveEndpoint(context, exchange, filter); 379 } 380 catch (JBIException e) { 381 throw new MessagingException("Failed to resolve endpoint: " + e, e); 382 } 383 } 384 } 385 } 386 } 387 if (theEndpoint != null) { 388 exchange.setEndpoint(theEndpoint); 389 } 390 if (log.isTraceEnabled()) { 391 log.trace("Routing exchange " + exchange + " to: " + theEndpoint); 392 } 393 } 394 395 403 protected ServiceEndpoint[] getMatchingEndpoints(ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) { 404 List filtered = new ArrayList (); 405 ComponentMBeanImpl consumer = getRegistry().getComponent(exchange.getSourceId()); 406 407 for (int i = 0; i < endpoints.length; i++) { 408 ComponentNameSpace id = ((InternalEndpoint) endpoints[i]).getComponentNameSpace(); 409 if (id != null) { 410 ComponentMBeanImpl provider = getRegistry().getComponent(id); 411 if (provider != null) { 412 if (!consumer.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange) || 413 !provider.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) { 414 continue; 415 } 416 } 417 } 418 filtered.add(endpoints[i]); 419 } 420 return (ServiceEndpoint[]) filtered.toArray(new ServiceEndpoint[filtered.size()]); 421 } 422 423 426 public EndpointChooser getDefaultInterfaceChooser() { 427 return defaultInterfaceChooser; 428 } 429 430 435 public void setDefaultInterfaceChooser(EndpointChooser defaultInterfaceChooser) { 436 this.defaultInterfaceChooser = defaultInterfaceChooser; 437 } 438 439 442 public EndpointChooser getDefaultServiceChooser() { 443 return defaultServiceChooser; 444 } 445 446 451 public void setDefaultServiceChooser(EndpointChooser defaultServiceChooser) { 452 this.defaultServiceChooser = defaultServiceChooser; 453 } 454 455 458 public FlowChooser getDefaultFlowChooser() { 459 return defaultFlowChooser; 460 } 461 462 465 public void setDefaultFlowChooser(FlowChooser defaultFlowChooser) { 466 this.defaultFlowChooser = defaultFlowChooser; 467 } 468 469 477 protected EndpointChooser getServiceChooser(MessageExchangeImpl exchange) { 478 EndpointChooser chooser = null; 479 ActivationSpec activationSpec = exchange.getActivationSpec(); 480 if (activationSpec != null) { 481 chooser = activationSpec.getServiceChooser(); 482 } 483 if (chooser == null) { 484 chooser = defaultServiceChooser; 485 } 486 return chooser; 487 } 488 489 497 protected EndpointChooser getInterfaceChooser(MessageExchangeImpl exchange) { 498 EndpointChooser chooser = null; 499 ActivationSpec activationSpec = exchange.getActivationSpec(); 500 if (activationSpec != null) { 501 chooser = activationSpec.getInterfaceChooser(); 502 } 503 if (chooser == null) { 504 chooser = defaultInterfaceChooser; 505 } 506 return chooser; 507 } 508 509 517 protected EndpointFilter createEndpointFilter(ComponentContextImpl context, MessageExchangeImpl exchange) { 518 Component component = context.getComponent(); 519 if (exchange.getRole() == Role.PROVIDER) { 520 return new ConsumerComponentEndpointFilter(component); 521 } 522 else { 523 return new ProducerComponentEndpointFilter(component); 524 } 525 } 526 527 533 public MBeanOperationInfo [] getOperationInfos() throws JMException { 534 OperationInfoHelper helper = new OperationInfoHelper(); 535 helper.addOperation(getObjectToManage(), "suspend", "suspend the NMR processing"); 536 helper.addOperation(getObjectToManage(), "resume", "resume the NMR processing"); 537 538 return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos()); 539 } 540 541 public JBIContainer getContainer() { 542 return container; 543 } 544 545 } | Popular Tags |