1 17 package org.apache.servicemix.jbi.framework; 18 19 import java.util.ArrayList ; 20 import java.util.Collection ; 21 import java.util.HashSet ; 22 import java.util.Iterator ; 23 import java.util.Map ; 24 import java.util.Set ; 25 26 import javax.jbi.JBIException; 27 import javax.jbi.component.ComponentContext; 28 import javax.jbi.servicedesc.ServiceEndpoint; 29 import javax.management.JMException ; 30 import javax.management.ObjectName ; 31 import javax.wsdl.Definition; 32 import javax.wsdl.Port; 33 import javax.wsdl.PortType; 34 import javax.wsdl.Service; 35 import javax.wsdl.factory.WSDLFactory; 36 import javax.wsdl.xml.WSDLReader; 37 import javax.xml.namespace.QName ; 38 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 import org.apache.servicemix.jbi.deployment.Provides; 42 import org.apache.servicemix.jbi.deployment.Services; 43 import org.apache.servicemix.jbi.event.EndpointEvent; 44 import org.apache.servicemix.jbi.event.EndpointListener; 45 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint; 46 import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint; 47 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint; 48 import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint; 49 import org.w3c.dom.Document ; 50 51 import com.ibm.wsdl.Constants; 52 53 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 54 55 60 public class EndpointRegistry { 61 62 private static final Log logger = LogFactory.getLog(EndpointRegistry.class); 63 64 private Registry registry; 65 66 private Map endpointMBeans; 67 68 private Map internalEndpoints; 69 70 private Map externalEndpoints; 71 72 private Map linkedEndpoints; 73 74 private Map interfaceConnections; 75 76 81 public EndpointRegistry(Registry registry) { 82 this.registry = registry; 83 this.endpointMBeans = new ConcurrentHashMap(); 84 this.internalEndpoints = new ConcurrentHashMap(); 85 this.externalEndpoints = new ConcurrentHashMap(); 86 this.linkedEndpoints = new ConcurrentHashMap(); 87 this.interfaceConnections = new ConcurrentHashMap(); 88 } 89 90 public ServiceEndpoint[] getEndpointsForComponent(ComponentNameSpace cns) { 91 Collection endpoints = new ArrayList (); 92 for (Iterator iter = getInternalEndpoints().iterator(); iter.hasNext();) { 93 InternalEndpoint endpoint = (InternalEndpoint) iter.next(); 94 if (cns.equals(endpoint.getComponentNameSpace())) { 95 endpoints.add(endpoint); 96 } 97 } 98 return asEndpointArray(endpoints); 99 } 100 101 104 public Collection getEndpointMBeans() { 105 return endpointMBeans.values(); 106 } 107 108 114 public ServiceEndpoint[] getEndpointsForService(QName serviceName) { 115 Collection collection = getEndpointsByService(serviceName, getInternalEndpoints()); 116 return asEndpointArray(collection); 117 } 118 119 127 public ServiceEndpoint[] getEndpointsForInterface(QName interfaceName) { 128 if (interfaceName == null) { 129 return asEndpointArray(internalEndpoints.values()); 130 } 131 InterfaceConnection conn = (InterfaceConnection) interfaceConnections.get(interfaceName); 132 if (conn != null) { 133 String key = getKey(conn.service, conn.endpoint); 134 ServiceEndpoint ep = (ServiceEndpoint) internalEndpoints.get(key); 135 if (ep == null) { 136 logger.warn("Connection for interface " + interfaceName + " could not find target for service " + conn.service + " and endpoint " + conn.endpoint); 137 return new ServiceEndpoint[0]; 138 } else { 139 return new ServiceEndpoint[] { ep }; 140 } 141 } 142 Collection result = getEndpointsByInterface(interfaceName, getInternalEndpoints()); 143 return asEndpointArray(result); 144 } 145 146 155 public InternalEndpoint registerInternalEndpoint(ComponentContextImpl provider, QName serviceName, String endpointName) throws JBIException { 156 String key = getKey(serviceName, endpointName); 158 InternalEndpoint registered = (InternalEndpoint) internalEndpoints.get(key); 159 if (registered != null && registered.isLocal()) { 161 throw new JBIException("An internal endpoint for service " + serviceName + " and endpoint " + endpointName + " is already registered"); 162 } 163 InternalEndpoint serviceEndpoint = new InternalEndpoint(provider.getComponentNameSpace(), endpointName, serviceName); 165 if (provider.getActivationSpec().getInterfaceName() != null) { 167 serviceEndpoint.addInterface(provider.getActivationSpec().getInterfaceName()); 168 } 169 retrieveInterfaceFromSUDescriptor(serviceEndpoint); 171 retrieveInterfacesFromDescription(serviceEndpoint); 173 if (registered != null) { 175 InternalEndpoint[] remote = registered.getRemoteEndpoints(); 176 for (int i = 0; i < remote.length; i++) { 177 serviceEndpoint.addRemoteEndpoint(remote[i]); 178 } 179 } 180 internalEndpoints.put(key, serviceEndpoint); 182 registerEndpoint(serviceEndpoint); 183 fireEvent(serviceEndpoint, EndpointEvent.INTERNAL_ENDPOINT_REGISTERED); 184 return serviceEndpoint; 185 } 186 187 193 public void unregisterInternalEndpoint(ComponentContext provider, InternalEndpoint serviceEndpoint) { 194 if (serviceEndpoint.isClustered()) { 195 serviceEndpoint.setComponentName(null); 197 } else { 198 String key = getKey(serviceEndpoint); 199 internalEndpoints.remove(key); 200 unregisterEndpoint(key); 201 } 202 fireEvent(serviceEndpoint, EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED); 203 } 204 205 210 protected void retrieveInterfaceFromSUDescriptor(InternalEndpoint serviceEndpoint) { 211 ServiceUnitLifeCycle[] sus = registry.getDeployedServiceUnits(serviceEndpoint.getComponentNameSpace().getName()); 212 for (int i = 0; i < sus.length; i++) { 213 Services services = sus[i].getServices(); 214 if (services != null) { 215 Provides[] provides = services.getProvides(); 216 if (provides != null) { 217 for (int j = 0; j < provides.length; j++) { 218 if (provides[j].getInterfaceName() != null && 219 serviceEndpoint.getServiceName().equals(provides[j].getServiceName()) && 220 serviceEndpoint.getEndpointName().equals(provides[j].getEndpointName())) { 221 if (logger.isDebugEnabled()) { 222 logger.debug("Endpoint " + serviceEndpoint + " is provided by SU " + sus[i].getName()); 223 logger.debug("Endpoint " + serviceEndpoint + " implements interface " + provides[j].getInterfaceName()); 224 } 225 serviceEndpoint.addInterface(provides[j].getInterfaceName()); 226 } 227 } 228 } 229 } 230 } 231 } 232 233 238 protected void retrieveInterfacesFromDescription(InternalEndpoint serviceEndpoint) { 239 try { 240 Document document = registry.getEndpointDescriptor(serviceEndpoint); 241 if (document == null) { 242 if (logger.isDebugEnabled()) { 243 logger.debug("Endpoint " + serviceEndpoint + " has no service description"); 244 } 245 return; 246 } 247 WSDLReader reader = WSDLFactory.newInstance().newWSDLReader(); 248 reader.setFeature(Constants.FEATURE_VERBOSE, false); 249 Definition definition = reader.readWSDL(null, document); 250 if (definition.getPortTypes().keySet().size() == 1 && 254 definition.getServices().keySet().size() == 0) { 255 PortType portType = (PortType) definition.getPortTypes().values().iterator().next(); 256 QName interfaceName = portType.getQName(); 257 if (logger.isDebugEnabled()) { 258 logger.debug("Endpoint " + serviceEndpoint + " implements interface " + interfaceName); 259 } 260 serviceEndpoint.addInterface(interfaceName); 261 } else { 262 Service service = definition.getService(serviceEndpoint.getServiceName()); 263 if (service == null) { 264 logger.info("Endpoint " + serviceEndpoint + " has a service description, but no matching service found in " + definition.getServices().keySet()); 265 return; 266 } 267 Port port = service.getPort(serviceEndpoint.getEndpointName()); 268 if (port == null) { 269 logger.info("Endpoint " + serviceEndpoint + " has a service description, but no matching endpoint found in " + service.getPorts().keySet()); 270 return; 271 } 272 if (port.getBinding() == null) { 273 logger.info("Endpoint " + serviceEndpoint + " has a service description, but no binding found"); 274 return; 275 } 276 if (port.getBinding().getPortType() == null) { 277 logger.info("Endpoint " + serviceEndpoint + " has a service description, but no port type found"); 278 return; 279 } 280 QName interfaceName = port.getBinding().getPortType().getQName(); 281 if (logger.isDebugEnabled()) { 282 logger.debug("Endpoint " + serviceEndpoint + " implements interface " + interfaceName); 283 } 284 serviceEndpoint.addInterface(interfaceName); 285 } 286 } catch (Exception e) { 287 logger.warn("Error retrieving interfaces from service description: " + e.getMessage()); 288 if (logger.isDebugEnabled()) { 289 logger.debug("Error retrieving interfaces from service description", e); 290 } 291 } 292 } 293 294 299 public void registerRemoteEndpoint(InternalEndpoint remote) { 300 InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints.get(getKey(remote)); 301 if (endpoint == null) { 303 endpoint = new InternalEndpoint(null, remote.getEndpointName(), remote.getServiceName()); 304 internalEndpoints.put(getKey(endpoint), endpoint); 305 } 306 endpoint.addRemoteEndpoint(remote); 308 fireEvent(remote, EndpointEvent.REMOTE_ENDPOINT_REGISTERED); 309 } 310 311 316 public void unregisterRemoteEndpoint(InternalEndpoint remote) { 317 InternalEndpoint endpoint = (InternalEndpoint) internalEndpoints.get(getKey(remote)); 318 if (endpoint != null) { 319 endpoint.removeRemoteEndpoint(remote); 320 fireEvent(remote, EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED); 321 } 322 } 323 324 331 public ServiceEndpoint getEndpoint(QName service, String name) { 332 String key = getKey(service, name); 333 ServiceEndpoint ep = (ServiceEndpoint) linkedEndpoints.get(key); 334 if (ep == null) { 335 ep = (ServiceEndpoint) internalEndpoints.get(key); 336 } 337 return ep; 338 } 339 340 public ServiceEndpoint getInternalEndpoint(QName service, String name) { 341 return (ServiceEndpoint) internalEndpoints.get(getKey(service, name)); 342 } 343 344 353 public void registerExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) throws JBIException { 354 ExternalEndpoint serviceEndpoint = new ExternalEndpoint(cns, externalEndpoint); 355 if (externalEndpoints.get(getKey(serviceEndpoint)) != null) { 356 throw new JBIException("An external endpoint for service " + externalEndpoint.getServiceName() + " and endpoint " + externalEndpoint.getEndpointName() + " is already registered"); 357 } 358 registerEndpoint(serviceEndpoint); 359 externalEndpoints.put(getKey(serviceEndpoint), serviceEndpoint); 360 fireEvent(serviceEndpoint, EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED); 361 } 362 363 371 public void unregisterExternalEndpoint(ComponentNameSpace cns, ServiceEndpoint externalEndpoint) { 372 externalEndpoints.remove(getKey(externalEndpoint)); 373 unregisterEndpoint(getKey(externalEndpoint)); 374 fireEvent(externalEndpoint, EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED); 375 } 376 377 384 public ServiceEndpoint[] getExternalEndpointsForInterface(QName interfaceName) { 385 Collection endpoints = getEndpointsByInterface(interfaceName, getExternalEndpoints()); 386 return asEndpointArray(endpoints); 387 } 388 389 395 public ServiceEndpoint[] getExternalEndpointsForService(QName serviceName) { 396 Collection endpoints = getEndpointsByService(serviceName, getExternalEndpoints()); 397 return asEndpointArray(endpoints); 398 } 399 400 406 protected ServiceEndpoint[] asEndpointArray(Collection collection) { 407 if (collection == null) { 408 return new ServiceEndpoint[0]; 409 } 410 ServiceEndpoint[] answer = new ServiceEndpoint[collection.size()]; 411 answer = (ServiceEndpoint[]) collection.toArray(answer); 412 return answer; 413 } 414 415 422 protected Collection getEndpointsByService(QName serviceName, Collection endpoints) { 423 Collection answer = new ArrayList (); 424 for (Iterator i = endpoints.iterator(); i.hasNext();) { 425 ServiceEndpoint endpoint = (ServiceEndpoint) i.next(); 426 if (endpoint.getServiceName().equals(serviceName)) { 427 answer.add(endpoint); 428 } 429 } 430 return answer; 431 } 432 433 439 protected Collection getEndpointsByInterface(QName interfaceName, Collection endpoints) { 440 if (interfaceName == null) { 441 return endpoints; 442 } 443 Set answer = new HashSet (); 444 for (Iterator i = endpoints.iterator(); i.hasNext();) { 445 ServiceEndpoint endpoint = (ServiceEndpoint) i.next(); 446 QName [] interfaces = endpoint.getInterfaces(); 447 if (interfaces != null) { 448 for (int k = 0; k < interfaces.length;k ++) { 449 QName qn = interfaces[k]; 450 if (qn != null && qn.equals(interfaceName)) { 451 answer.add(endpoint); 452 break; 453 } 454 } 455 } 456 } 457 return answer; 458 } 459 460 463 protected Collection getInternalEndpoints() { 464 return internalEndpoints.values(); 465 } 466 467 470 protected Collection getExternalEndpoints() { 471 return externalEndpoints.values(); 472 } 473 474 484 public void registerEndpointConnection(QName fromSvc, String fromEp, QName toSvc, String toEp, String link) throws JBIException { 485 LinkedEndpoint ep = new LinkedEndpoint(fromSvc, fromEp, toSvc, toEp, link); 486 if (linkedEndpoints.get(getKey(ep)) != null) { 487 throw new JBIException("An endpoint connection for service " + ep.getServiceName() + " and name " + ep.getEndpointName() + " is already registered"); 488 } 489 linkedEndpoints.put(getKey(ep), ep); 490 registerEndpoint(ep); 491 fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_REGISTERED); 492 } 493 494 500 public void unregisterEndpointConnection(QName fromSvc, String fromEp) { 501 unregisterEndpoint(getKey(fromSvc, fromEp)); 502 LinkedEndpoint ep = (LinkedEndpoint) linkedEndpoints.remove(getKey(fromSvc, fromEp)); 503 fireEvent(ep, EndpointEvent.LINKED_ENDPOINT_UNREGISTERED); 504 } 505 506 514 public void registerInterfaceConnection(QName fromItf, QName toSvc, String toEp) throws JBIException { 515 if (interfaceConnections.get(fromItf) != null) { 516 throw new JBIException("An interface connection for " + fromItf + " is already registered"); 517 } 518 interfaceConnections.put(fromItf, new InterfaceConnection(toSvc, toEp)); 519 } 520 521 526 public void unregisterInterfaceConnection(QName fromItf) { 527 interfaceConnections.remove(fromItf); 528 529 } 530 531 private void registerEndpoint(AbstractServiceEndpoint serviceEndpoint) { 532 String key = getKey(serviceEndpoint); 533 try { 534 Endpoint endpoint = new Endpoint(serviceEndpoint, registry); 535 ObjectName objectName = registry.getContainer().getManagementContext().createObjectName(endpoint); 536 registry.getContainer().getManagementContext().registerMBean(objectName, endpoint, EndpointMBean.class); 537 endpointMBeans.put(key, endpoint); 538 } catch (JMException e) { 539 logger.error("Could not register MBean for endpoint", e); 540 } 541 } 542 543 private void unregisterEndpoint(String key) { 544 Endpoint ep = (Endpoint) endpointMBeans.remove(key); 545 if (ep != null) { 546 try { 547 registry.getContainer().getManagementContext().unregisterMBean(ep); 548 } catch (JBIException e) { 549 logger.error("Could not unregister MBean for endpoint", e); 550 } 551 } 552 } 553 554 private String getKey(ServiceEndpoint ep) { 555 return getKey(ep.getServiceName(), ep.getEndpointName()); 556 } 557 558 private String getKey(QName svcName, String epName) { 559 return svcName + epName; 560 } 561 562 private static class InterfaceConnection { 563 QName service; 564 String endpoint; 565 InterfaceConnection(QName service, String endpoint) { 566 this.service = service; 567 this.endpoint = endpoint; 568 } 569 } 570 571 protected void fireEvent(ServiceEndpoint ep, int type) { 572 EndpointEvent event = new EndpointEvent(ep, type); 573 EndpointListener[] listeners = (EndpointListener[]) registry.getContainer().getListeners(EndpointListener.class); 574 for (int i = 0; i < listeners.length; i++) { 575 switch (type) { 576 case EndpointEvent.INTERNAL_ENDPOINT_REGISTERED: 577 listeners[i].internalEndpointRegistered(event); 578 break; 579 case EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED: 580 listeners[i].internalEndpointUnregistered(event); 581 break; 582 case EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED: 583 listeners[i].externalEndpointRegistered(event); 584 break; 585 case EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED: 586 listeners[i].externalEndpointUnregistered(event); 587 break; 588 case EndpointEvent.LINKED_ENDPOINT_REGISTERED: 589 listeners[i].linkedEndpointRegistered(event); 590 break; 591 case EndpointEvent.LINKED_ENDPOINT_UNREGISTERED: 592 listeners[i].linkedEndpointUnregistered(event); 593 break; 594 case EndpointEvent.REMOTE_ENDPOINT_REGISTERED: 595 listeners[i].remoteEndpointRegistered(event); 596 break; 597 case EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED: 598 listeners[i].remoteEndpointUnregistered(event); 599 break; 600 } 601 } 602 603 } 604 605 } 606 | Popular Tags |