1 10 11 package org.mule.providers; 12 13 import java.beans.ExceptionListener ; 14 import java.io.OutputStream ; 15 16 import javax.resource.spi.work.Work ; 17 import javax.resource.spi.work.WorkManager ; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 import org.mule.MuleRuntimeException; 22 import org.mule.config.MuleProperties; 23 import org.mule.config.ThreadingProfile; 24 import org.mule.config.i18n.Message; 25 import org.mule.config.i18n.Messages; 26 import org.mule.impl.ImmutableMuleEndpoint; 27 import org.mule.impl.RequestContext; 28 import org.mule.impl.internal.notifications.ConnectionNotification; 29 import org.mule.impl.internal.notifications.MessageNotification; 30 import org.mule.impl.internal.notifications.SecurityNotification; 31 import org.mule.transaction.TransactionCoordination; 32 import org.mule.umo.TransactionException; 33 import org.mule.umo.UMOEvent; 34 import org.mule.umo.UMOException; 35 import org.mule.umo.UMOMessage; 36 import org.mule.umo.UMOTransaction; 37 import org.mule.umo.endpoint.UMOEndpointURI; 38 import org.mule.umo.endpoint.UMOImmutableEndpoint; 39 import org.mule.umo.manager.UMOWorkManager; 40 import org.mule.umo.provider.DispatchException; 41 import org.mule.umo.provider.ReceiveException; 42 import org.mule.umo.provider.UMOConnector; 43 import org.mule.umo.provider.UMOMessageDispatcher; 44 import org.mule.util.concurrent.WaitableBoolean; 45 46 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 47 48 52 public abstract class AbstractMessageDispatcher implements UMOMessageDispatcher, ExceptionListener 53 { 54 57 protected transient Log logger = LogFactory.getLog(getClass()); 58 59 62 protected UMOWorkManager workManager = null; 63 64 protected final UMOImmutableEndpoint endpoint; 65 protected final AbstractConnector connector; 66 67 protected boolean disposed = false; 68 69 protected boolean doThreading = true; 70 71 protected ConnectionStrategy connectionStrategy; 72 73 protected final WaitableBoolean connected = new WaitableBoolean(false); 74 private final AtomicBoolean connecting = new AtomicBoolean(false); 75 76 public AbstractMessageDispatcher(UMOImmutableEndpoint endpoint) 77 { 78 this.endpoint = endpoint; 79 this.connector = (AbstractConnector)endpoint.getConnector(); 80 81 connectionStrategy = connector.getConnectionStrategy(); 82 if (connectionStrategy instanceof AbstractConnectionStrategy) 83 { 84 ((AbstractConnectionStrategy)connectionStrategy).setDoThreading(false); 88 } 89 90 ThreadingProfile profile = connector.getDispatcherThreadingProfile(); 91 doThreading = profile.isDoThreading(); 92 if (doThreading) 93 { 94 workManager = connector.createDispatcherWorkManager(connector.getName() + ".dispatchers"); 95 try 96 { 97 workManager.start(); 98 } 99 catch (UMOException e) 100 { 101 dispose(); 102 throw new MuleRuntimeException(new Message(Messages.FAILED_TO_START_X, "WorkManager"), e); 103 } 104 } 105 } 106 107 112 public final void dispatch(UMOEvent event) throws DispatchException 113 { 114 try 115 { 116 event.setSynchronous(false); 117 event.getMessage().setProperty(MuleProperties.MULE_ENDPOINT_PROPERTY, 118 event.getEndpoint().getEndpointURI().toString()); 119 RequestContext.setEvent(event); 120 UMOImmutableEndpoint endpoint = event.getEndpoint(); 122 if (endpoint.getSecurityFilter() != null) 123 { 124 try 125 { 126 endpoint.getSecurityFilter().authenticate(event); 127 } 128 catch (org.mule.umo.security.SecurityException e) 129 { 130 logger.warn("Outbound Request was made but was not authenticated: " + e.getMessage(), e); 131 connector.fireNotification(new SecurityNotification(e, 132 SecurityNotification.ADMIN_EVENT_ACTION_START_RANGE)); 133 connector.handleException(e); 134 return; 135 } 136 catch (UMOException e) 137 { 138 dispose(); 139 throw new DispatchException(event.getMessage(), event.getEndpoint(), e); 140 } 141 } 142 event = RequestContext.getEvent(); 145 146 try 147 { 148 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 149 if (doThreading && !event.isSynchronous() && tx == null) 150 { 151 workManager.scheduleWork(new Worker(event), WorkManager.INDEFINITE, null, connector); 152 } 153 else 154 { 155 connectionStrategy.connect(this); 157 doDispatch(event); 158 if (connector.isEnableMessageEvents()) 159 { 160 String component = null; 161 if (event.getComponent() != null) 162 { 163 component = event.getComponent().getDescriptor().getName(); 164 } 165 connector.fireNotification(new MessageNotification(event.getMessage(), 166 event.getEndpoint(), component, MessageNotification.MESSAGE_DISPATCHED)); 167 } 168 } 169 } 170 catch (DispatchException e) 171 { 172 dispose(); 173 throw e; 174 } 175 catch (Exception e) 176 { 177 dispose(); 178 throw new DispatchException(event.getMessage(), event.getEndpoint(), e); 179 } 180 } 181 finally 182 { 183 if (connector.isCreateDispatcherPerRequest()) 184 { 185 dispose(); 186 } 187 } 188 } 189 190 public final UMOMessage send(UMOEvent event) throws DispatchException 191 { 192 try 193 { 194 if (isTransactionRollback()) 196 { 197 return event.getMessage(); 198 } 199 event.setSynchronous(true); 200 event.getMessage().setProperty(MuleProperties.MULE_ENDPOINT_PROPERTY, 201 event.getEndpoint().getEndpointURI().toString()); 202 RequestContext.setEvent(event); 203 UMOImmutableEndpoint endpoint = event.getEndpoint(); 205 if (endpoint.getSecurityFilter() != null) 206 { 207 try 208 { 209 endpoint.getSecurityFilter().authenticate(event); 210 } 211 catch (org.mule.umo.security.SecurityException e) 212 { 213 logger.warn("Outbound Request was made but was not authenticated: " + e.getMessage(), e); 214 connector.fireNotification(new SecurityNotification(e, 215 SecurityNotification.SECURITY_AUTHENTICATION_FAILED)); 216 connector.handleException(e); 217 return event.getMessage(); 218 } 219 catch (UMOException e) 220 { 221 dispose(); 222 throw new DispatchException(event.getMessage(), event.getEndpoint(), e); 223 } 224 } 225 event = RequestContext.getEvent(); 228 try 229 { 230 connectionStrategy.connect(this); 232 233 UMOMessage result = doSend(event); 234 if (connector.isEnableMessageEvents()) 235 { 236 String component = null; 237 if (event.getComponent() != null) 238 { 239 component = event.getComponent().getDescriptor().getName(); 240 } 241 connector.fireNotification(new MessageNotification(event.getMessage(), 242 event.getEndpoint(), component, MessageNotification.MESSAGE_SENT)); 243 } 244 if (result != null) 248 { 249 result.removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY); 250 } 251 return result; 252 } 253 catch (DispatchException e) 254 { 255 dispose(); 256 throw e; 257 } 258 catch (Exception e) 259 { 260 dispose(); 261 throw new DispatchException(event.getMessage(), event.getEndpoint(), e); 262 } 263 } 264 finally 265 { 266 if (connector.isCreateDispatcherPerRequest()) 267 { 268 dispose(); 269 } 270 } 271 } 272 273 287 public final UMOMessage receive(UMOEndpointURI endpointUri, long timeout) throws Exception 288 { 289 return receive(new ImmutableMuleEndpoint(endpointUri.toString(), true), timeout); 290 } 291 292 304 public final UMOMessage receive(UMOImmutableEndpoint endpoint, long timeout) throws Exception 305 { 306 try 307 { 308 try 309 { 310 connectionStrategy.connect(this); 312 UMOMessage result = doReceive(endpoint, timeout); 313 if (result != null && connector.isEnableMessageEvents()) 314 { 315 String component = null; 316 connector.fireNotification(new MessageNotification(result, endpoint, component, 317 MessageNotification.MESSAGE_RECEIVED)); 318 } 319 return result; 320 } 321 catch (DispatchException e) 322 { 323 dispose(); 324 throw e; 325 } 326 catch (Exception e) 327 { 328 dispose(); 329 throw new ReceiveException(endpoint, timeout, e); 330 } 331 } 332 finally 333 { 334 if (connector.isCreateDispatcherPerRequest()) 335 { 336 dispose(); 337 } 338 } 339 340 } 341 342 347 public void exceptionThrown(Exception e) 348 { 349 try 350 { 351 getConnector().handleException(e); 352 } 353 finally 354 { 355 dispose(); 356 } 357 } 358 359 public final boolean isDisposed() 360 { 361 return disposed; 362 } 363 364 367 public final synchronized void dispose() 368 { 369 if (!disposed) 370 { 371 try 372 { 373 try 374 { 375 disconnect(); 376 } 377 catch (Exception e) 378 { 379 logger.warn(e.getMessage(), e); 380 } 381 doDispose(); 382 if (workManager != null) 383 { 384 workManager.dispose(); 385 } 386 } 387 finally 388 { 389 disposed = true; 390 } 391 } 392 } 393 394 public UMOConnector getConnector() 395 { 396 return connector; 397 } 398 399 416 protected boolean useRemoteSync(UMOEvent event) 417 { 418 boolean remoteSync = false; 419 if (event.getEndpoint().getConnector().isRemoteSyncEnabled()) 420 { 421 remoteSync = event.getEndpoint().isRemoteSync() 422 || event.getMessage().getBooleanProperty( 423 MuleProperties.MULE_REMOTE_SYNC_PROPERTY, false); 424 if (remoteSync) 425 { 426 if (event.getComponent() != null) 428 { 429 remoteSync = event.getComponent().getDescriptor().getResponseRouter() == null; 430 } 431 } 432 } 433 if (!remoteSync) 434 { 435 event.getMessage().removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY); 436 } 437 return remoteSync; 438 } 439 440 450 public OutputStream getOutputStream(UMOImmutableEndpoint endpoint, UMOMessage message) 451 throws UMOException 452 { 453 return null; 454 } 455 456 public synchronized void connect() throws Exception 457 { 458 if (connected.get()) 459 { 460 return; 461 } 462 463 if (disposed) 464 { 465 if (logger.isWarnEnabled()) 466 { 467 logger.warn("Dispatcher has been disposed. Cannot connector resource"); 468 } 469 } 470 471 if (logger.isDebugEnabled()) 472 { 473 logger.debug("Attempting to connect to: " + endpoint.getEndpointURI()); 474 } 475 476 if (connecting.compareAndSet(false, true)) 477 { 478 connectionStrategy.connect(this); 479 logger.info("Successfully connected to: " + endpoint.getEndpointURI()); 480 return; 481 } 482 483 try 484 { 485 doConnect(endpoint); 486 connector.fireNotification(new ConnectionNotification(this, getConnectEventId(endpoint), 487 ConnectionNotification.CONNECTION_CONNECTED)); 488 } 489 catch (Exception e) 490 { 491 connector.fireNotification(new ConnectionNotification(this, getConnectEventId(endpoint), 492 ConnectionNotification.CONNECTION_FAILED)); 493 if (e instanceof ConnectException) 494 { 495 throw (ConnectException)e; 496 } 497 else 498 { 499 throw new ConnectException(e, this); 500 } 501 } 502 503 connected.set(true); 504 connecting.set(false); 505 } 506 507 public synchronized void disconnect() throws Exception 508 { 509 if (logger.isDebugEnabled()) 510 { 511 logger.debug("Disconnecting from: " + endpoint.getEndpointURI()); 512 } 513 514 connector.fireNotification(new ConnectionNotification(this, getConnectEventId(endpoint), 515 ConnectionNotification.CONNECTION_DISCONNECTED)); 516 connected.set(false); 517 doDisconnect(); 518 logger.info("Disconnected from: " + endpoint.getEndpointURI()); 519 } 520 521 protected String getConnectEventId(UMOImmutableEndpoint endpoint) 522 { 523 return connector.getName() + ".dispatcher (" + endpoint.getEndpointURI() + ")"; 524 } 525 526 public final boolean isConnected() 527 { 528 return connected.get(); 529 } 530 531 536 public String getConnectionDescription() 537 { 538 return endpoint.getEndpointURI().toString(); 539 } 540 541 public synchronized void reconnect() throws Exception 542 { 543 disconnect(); 544 connect(); 545 } 546 547 protected abstract void doDispose(); 548 549 protected abstract void doDispatch(UMOEvent event) throws Exception ; 550 551 protected abstract UMOMessage doSend(UMOEvent event) throws Exception ; 552 553 protected abstract void doConnect(UMOImmutableEndpoint endpoint) throws Exception ; 554 555 protected abstract void doDisconnect() throws Exception ; 556 557 569 protected abstract UMOMessage doReceive(UMOImmutableEndpoint endpoint, long timeout) throws Exception ; 570 571 private class Worker implements Work 572 { 573 private UMOEvent event; 574 575 public Worker(UMOEvent event) 576 { 577 this.event = event; 578 } 579 580 585 public void run() 586 { 587 try 588 { 589 RequestContext.setEvent(event); 590 connectionStrategy.connect(AbstractMessageDispatcher.this); 592 doDispatch(event); 593 if (connector.isEnableMessageEvents()) 594 { 595 String component = null; 596 if (event.getComponent() != null) 597 { 598 component = event.getComponent().getDescriptor().getName(); 599 } 600 connector.fireNotification(new MessageNotification(event.getMessage(), 601 event.getEndpoint(), component, MessageNotification.MESSAGE_DISPATCHED)); 602 } 603 } 604 catch (Exception e) 605 { 606 getConnector().handleException(e); 607 } 608 } 609 610 public void release() 611 { 612 } 614 } 615 616 621 protected boolean isTransactionRollback() 622 { 623 try 624 { 625 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 626 if (tx != null && tx.isRollbackOnly()) 627 { 628 return true; 629 } 630 } 631 catch (TransactionException e) 632 { 633 logger.warn(e.getMessage()); 634 } 635 return false; 636 } 637 } 638 | Popular Tags |