1 10 11 package org.mule.impl.model; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 import org.mule.MuleManager; 17 import org.mule.config.i18n.Message; 18 import org.mule.config.i18n.Messages; 19 import org.mule.impl.DefaultComponentExceptionStrategy; 20 import org.mule.impl.MuleDescriptor; 21 import org.mule.impl.RequestContext; 22 import org.mule.impl.internal.notifications.ComponentNotification; 23 import org.mule.management.stats.ComponentStatistics; 24 import org.mule.providers.AbstractConnector; 25 import org.mule.umo.ComponentException; 26 import org.mule.umo.UMOComponent; 27 import org.mule.umo.UMODescriptor; 28 import org.mule.umo.UMOEvent; 29 import org.mule.umo.UMOException; 30 import org.mule.umo.UMOMessage; 31 import org.mule.umo.endpoint.UMOEndpoint; 32 import org.mule.umo.lifecycle.InitialisationException; 33 import org.mule.umo.model.ModelException; 34 import org.mule.umo.model.UMOModel; 35 import org.mule.umo.provider.DispatchException; 36 import org.mule.umo.provider.UMOMessageDispatcher; 37 import org.mule.umo.provider.UMOMessageReceiver; 38 import org.mule.util.concurrent.WaitableBoolean; 39 40 import java.beans.ExceptionListener ; 41 import java.util.ArrayList ; 42 import java.util.Iterator ; 43 import java.util.List ; 44 45 48 public abstract class AbstractComponent implements UMOComponent 49 { 50 53 protected transient Log logger = LogFactory.getLog(getClass()); 54 55 58 protected MuleDescriptor descriptor = null; 59 60 protected ComponentStatistics stats = null; 61 62 65 protected AtomicBoolean stopped = new AtomicBoolean(true); 66 67 70 protected WaitableBoolean stopping = new WaitableBoolean(false); 71 72 75 protected AtomicBoolean poolInitialised = new AtomicBoolean(false); 76 77 81 protected ExceptionListener exceptionListener = null; 82 83 86 protected AtomicBoolean initialised = new AtomicBoolean(false); 87 88 91 protected UMOModel model; 92 93 96 protected WaitableBoolean paused = new WaitableBoolean(false); 97 98 101 public AbstractComponent(MuleDescriptor descriptor, UMOModel model) 102 { 103 if (descriptor == null) 104 { 105 throw new IllegalArgumentException ("Descriptor cannot be null"); 106 } 107 this.descriptor = descriptor; 108 this.model = MuleManager.getInstance().getModel(); 109 } 110 111 120 public final synchronized void initialise() throws InitialisationException 121 { 122 if (initialised.get()) 123 { 124 throw new InitialisationException(new Message(Messages.OBJECT_X_ALREADY_INITIALISED, 125 "Component '" + descriptor.getName() + "'"), this); 126 } 127 descriptor.initialise(); 128 129 this.exceptionListener = descriptor.getExceptionListener(); 130 131 stats = new ComponentStatistics(getName(), descriptor.getPoolingProfile().getMaxActive(), 133 descriptor.getThreadingProfile().getMaxThreadsActive()); 134 135 stats.setEnabled(((MuleManager)MuleManager.getInstance()).getStatistics().isEnabled()); 136 ((MuleManager)MuleManager.getInstance()).getStatistics().add(stats); 137 stats.setOutboundRouterStat(getDescriptor().getOutboundRouter().getStatistics()); 138 stats.setInboundRouterStat(getDescriptor().getInboundRouter().getStatistics()); 139 140 doInitialise(); 141 initialised.set(true); 142 fireComponentNotification(ComponentNotification.COMPONENT_INITIALISED); 143 144 } 145 146 protected void fireComponentNotification(int action) 147 { 148 MuleManager.getInstance().fireNotification(new ComponentNotification(descriptor, action)); 149 } 150 151 void finaliseEvent(UMOEvent event) 152 { 153 logger.debug("Finalising event for: " + descriptor.getName() + " event endpointUri is: " 154 + event.getEndpoint().getEndpointURI()); 155 } 157 158 public void forceStop() throws UMOException 159 { 160 if (!stopped.get()) 161 { 162 logger.debug("Stopping UMOComponent"); 163 stopping.set(true); 164 fireComponentNotification(ComponentNotification.COMPONENT_STOPPING); 165 doForceStop(); 166 stopped.set(true); 167 stopping.set(false); 168 fireComponentNotification(ComponentNotification.COMPONENT_STOPPED); 169 } 170 } 171 172 public void stop() throws UMOException 173 { 174 if (!stopped.get()) 175 { 176 logger.debug("Stopping UMOComponent"); 177 stopping.set(true); 178 fireComponentNotification(ComponentNotification.COMPONENT_STOPPING); 179 180 unregisterListeners(); 182 if (MuleManager.getInstance().getQueueManager().getQueueSession().getQueue( 183 descriptor.getName() + ".component").size() > 0) 184 { 185 try 186 { 187 stopping.whenFalse(null); 188 } 189 catch (InterruptedException e) 190 { 191 } 193 } 194 195 doStop(); 196 stopped.set(true); 197 fireComponentNotification(ComponentNotification.COMPONENT_STOPPED); 198 } 199 } 200 201 public void start() throws UMOException 202 { 203 start(false); 204 } 205 206 212 protected void start(boolean startPaused) throws UMOException 213 { 214 215 registerListeners(); 217 218 connectListeners(); 225 226 if (stopped.get()) 228 { 229 stopped.set(false); 230 paused.set(false); 231 doStart(); 232 } 233 fireComponentNotification(ComponentNotification.COMPONENT_STARTED); 234 if (startPaused) 235 { 236 pause(); 237 } 238 239 startListeners(); 244 } 245 246 251 public final void pause() throws UMOException 252 { 253 254 doPause(); 255 paused.set(true); 256 fireComponentNotification(ComponentNotification.COMPONENT_PAUSED); 257 } 258 259 263 public final void resume() throws UMOException 264 { 265 doResume(); 266 paused.set(false); 267 fireComponentNotification(ComponentNotification.COMPONENT_RESUMED); 268 } 269 270 275 public boolean isPaused() 276 { 277 return paused.get(); 278 } 279 280 287 protected void doPause() throws UMOException 288 { 289 } 291 292 299 protected void doResume() throws UMOException 300 { 301 } 303 304 public final void dispose() 305 { 306 try 307 { 308 if (!stopped.get()) 309 { 310 stop(); 311 } 312 } 313 catch (UMOException e) 314 { 315 logger.error("Failed to stop component: " + descriptor.getName(), e); 316 } 317 doDispose(); 318 fireComponentNotification(ComponentNotification.COMPONENT_DISPOSED); 319 ((MuleManager)MuleManager.getInstance()).getStatistics().remove(stats); 320 } 321 322 public ComponentStatistics getStatistics() 323 { 324 return stats; 325 } 326 327 332 public UMODescriptor getDescriptor() 333 { 334 return descriptor; 335 } 336 337 public void dispatchEvent(UMOEvent event) throws UMOException 338 { 339 if (stopping.get() || stopped.get()) 340 { 341 throw new ComponentException(new Message(Messages.COMPONENT_X_IS_STOPPED, 342 getDescriptor().getName()), event.getMessage(), this); 343 } 344 345 try 346 { 347 waitIfPaused(event); 348 } 349 catch (InterruptedException e) 350 { 351 throw new ComponentException(event.getMessage(), this, e); 352 } 353 354 if (!event.getEndpoint().canReceive()) 357 { 358 UMOMessageDispatcher dispatcher = event.getEndpoint().getConnector().getDispatcher( 359 event.getEndpoint()); 360 try 361 { 362 dispatcher.dispatch(event); 363 } 364 catch (Exception e) 365 { 366 throw new DispatchException(event.getMessage(), event.getEndpoint(), e); 367 } 368 return; 369 } 370 371 if (stats.isEnabled()) 373 { 374 stats.incReceivedEventASync(); 375 } 376 if (logger.isDebugEnabled()) 377 { 378 logger.debug("Component: " + descriptor.getName() + " has received asynchronous event on: " 379 + event.getEndpoint().getEndpointURI()); 380 } 381 382 doDispatch(event); 383 } 384 385 public UMOMessage sendEvent(UMOEvent event) throws UMOException 386 { 387 if (stopping.get() || stopped.get()) 388 { 389 throw new ComponentException(new Message(Messages.COMPONENT_X_IS_STOPPED, 390 getDescriptor().getName()), event.getMessage(), this); 391 } 392 393 try 394 { 395 waitIfPaused(event); 396 } 397 catch (InterruptedException e) 398 { 399 throw new ComponentException(event.getMessage(), this, e); 400 } 401 402 if (stats.isEnabled()) 403 { 404 stats.incReceivedEventSync(); 405 } 406 if (logger.isDebugEnabled()) 407 { 408 logger.debug("Component: " + descriptor.getName() + " has received synchronous event on: " 409 + event.getEndpoint().getEndpointURI()); 410 } 411 RequestContext.setEvent(event); 412 return doSend(event); 413 } 414 415 423 protected void waitIfPaused(UMOEvent event) throws InterruptedException 424 { 425 if (logger.isDebugEnabled() && paused.get()) 426 { 427 logger.debug("Component: " + descriptor.getName() 428 + " is paused. Blocking call until resume is called"); 429 } 430 paused.whenFalse(null); 431 } 432 433 436 public String getName() 437 { 438 return descriptor.getName(); 439 } 440 441 446 public String toString() 447 { 448 return descriptor.getName(); 449 } 450 451 public boolean isStopped() 452 { 453 return stopped.get(); 454 } 455 456 public boolean isStopping() 457 { 458 return stopping.get(); 459 } 460 461 protected void handleException(Exception e) 462 { 463 if (exceptionListener instanceof DefaultComponentExceptionStrategy) 464 { 465 if (((DefaultComponentExceptionStrategy)exceptionListener).getComponent() == null) 466 { 467 ((DefaultComponentExceptionStrategy)exceptionListener).setComponent(this); 468 } 469 } 470 exceptionListener.exceptionThrown(e); 471 } 472 473 479 protected Object lookupComponent() throws UMOException 480 { 481 return ComponentFactory.createComponent(getDescriptor()); 482 } 483 484 protected void doForceStop() throws UMOException 485 { 486 } 488 489 protected void doStop() throws UMOException 490 { 491 } 493 494 protected void doStart() throws UMOException 495 { 496 } 498 499 protected void doDispose() 500 { 501 } 503 504 protected void doInitialise() throws InitialisationException 505 { 506 } 508 509 public boolean isStarted() 510 { 511 return !stopped.get(); 512 } 513 514 protected abstract UMOMessage doSend(UMOEvent event) throws UMOException; 515 516 protected abstract void doDispatch(UMOEvent event) throws UMOException; 517 518 527 public Object getInstance() throws UMOException 528 { 529 return lookupComponent(); 530 } 531 532 protected void registerListeners() throws UMOException 533 { 534 UMOEndpoint endpoint; 535 List endpoints = getIncomingEndpoints(); 536 537 for (Iterator it = endpoints.iterator(); it.hasNext();) 538 { 539 endpoint = (UMOEndpoint)it.next(); 540 try 541 { 542 endpoint.getConnector().registerListener(this, endpoint); 543 } 544 catch (UMOException e) 545 { 546 throw e; 547 } 548 catch (Exception e) 549 { 550 throw new ModelException(new Message(Messages.FAILED_TO_REGISTER_X_ON_ENDPOINT_X, 551 getDescriptor().getName(), endpoint.getEndpointURI()), e); 552 } 553 } 554 } 555 556 protected void unregisterListeners() throws UMOException 557 { 558 UMOEndpoint endpoint; 559 List endpoints = getIncomingEndpoints(); 560 561 for (Iterator it = endpoints.iterator(); it.hasNext();) 562 { 563 endpoint = (UMOEndpoint)it.next(); 564 try 565 { 566 endpoint.getConnector().unregisterListener(this, endpoint); 567 } 568 catch (UMOException e) 569 { 570 throw e; 571 } 572 catch (Exception e) 573 { 574 throw new ModelException(new Message(Messages.FAILED_TO_UNREGISTER_X_ON_ENDPOINT_X, 575 getDescriptor().getName(), endpoint.getEndpointURI()), e); 576 } 577 } 578 } 579 580 protected void startListeners() throws UMOException 581 { 582 UMOEndpoint endpoint; 583 List endpoints = getIncomingEndpoints(); 584 585 for (Iterator it = endpoints.iterator(); it.hasNext();) 586 { 587 endpoint = (UMOEndpoint)it.next(); 588 UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this, 589 endpoint); 590 if (receiver != null && endpoint.getConnector().isStarted() 591 && endpoint.getInitialState().equals(UMOEndpoint.INITIAL_STATE_STARTED)) 592 { 593 receiver.start(); 594 } 595 } 596 } 597 598 protected void stopListeners() throws UMOException 599 { 600 UMOEndpoint endpoint; 601 List endpoints = getIncomingEndpoints(); 602 603 for (Iterator it = endpoints.iterator(); it.hasNext();) 604 { 605 endpoint = (UMOEndpoint)it.next(); 606 UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this, 607 endpoint); 608 if (receiver != null) 609 { 610 receiver.stop(); 611 } 612 } 613 } 614 615 protected void connectListeners() throws UMOException 616 { 617 UMOEndpoint endpoint; 618 List endpoints = getIncomingEndpoints(); 619 620 for (Iterator it = endpoints.iterator(); it.hasNext();) 621 { 622 endpoint = (UMOEndpoint)it.next(); 623 UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this, 624 endpoint); 625 if (receiver != null) 626 { 627 try 628 { 629 receiver.connect(); 630 } 631 catch (Exception e) 632 { 633 throw new ModelException( 634 Message.createStaticMessage("Failed to connect listener for endpoint " 635 + endpoint.getName()), e); 636 } 637 } 638 } 639 } 640 641 protected void disconnectListeners() throws UMOException 642 { 643 UMOEndpoint endpoint; 644 List endpoints = getIncomingEndpoints(); 645 646 for (Iterator it = endpoints.iterator(); it.hasNext();) 647 { 648 endpoint = (UMOEndpoint)it.next(); 649 UMOMessageReceiver receiver = ((AbstractConnector)endpoint.getConnector()).getReceiver(this, 650 endpoint); 651 if (receiver != null) 652 { 653 try 654 { 655 receiver.disconnect(); 656 } 657 catch (Exception e) 658 { 659 throw new ModelException( 660 Message.createStaticMessage("Failed to connect listener for endpoint " 661 + endpoint.getName()), e); 662 } 663 } 664 } 665 } 666 667 670 protected List getIncomingEndpoints() 671 { 672 List endpoints = new ArrayList (); 673 674 endpoints.addAll(getDescriptor().getInboundRouter().getEndpoints()); 676 if (getDescriptor().getInboundEndpoint() != null) 678 { 679 endpoints.add(getDescriptor().getInboundEndpoint()); 680 } 681 682 if (getDescriptor().getResponseRouter() != null 684 && getDescriptor().getResponseRouter().getEndpoints() != null) 685 { 686 endpoints.addAll(getDescriptor().getResponseRouter().getEndpoints()); 687 } 688 return endpoints; 689 } 690 691 } 692 | Popular Tags |