1 10 11 package org.mule.providers; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 import org.mule.config.ExceptionHelper; 17 import org.mule.config.i18n.Message; 18 import org.mule.config.i18n.Messages; 19 import org.mule.impl.MuleEvent; 20 import org.mule.impl.MuleMessage; 21 import org.mule.impl.MuleSession; 22 import org.mule.impl.RequestContext; 23 import org.mule.impl.ResponseOutputStream; 24 import org.mule.impl.internal.notifications.ConnectionNotification; 25 import org.mule.impl.internal.notifications.MessageNotification; 26 import org.mule.impl.internal.notifications.SecurityNotification; 27 import org.mule.transaction.TransactionCoordination; 28 import org.mule.umo.UMOComponent; 29 import org.mule.umo.UMOEvent; 30 import org.mule.umo.UMOException; 31 import org.mule.umo.UMOMessage; 32 import org.mule.umo.UMOSession; 33 import org.mule.umo.UMOTransaction; 34 import org.mule.umo.endpoint.UMOEndpoint; 35 import org.mule.umo.endpoint.UMOEndpointURI; 36 import org.mule.umo.lifecycle.InitialisationException; 37 import org.mule.umo.manager.UMOWorkManager; 38 import org.mule.umo.provider.UMOConnector; 39 import org.mule.umo.provider.UMOMessageReceiver; 40 import org.mule.umo.security.SecurityException; 41 import org.mule.umo.transformer.TransformerException; 42 import org.mule.umo.transformer.UMOTransformer; 43 import org.mule.util.StringMessageUtils; 44 import org.mule.util.concurrent.WaitableBoolean; 45 46 import java.io.OutputStream ; 47 48 56 public abstract class AbstractMessageReceiver implements UMOMessageReceiver 57 { 58 61 protected transient Log logger = LogFactory.getLog(getClass()); 62 63 66 protected UMOComponent component = null; 67 68 71 protected UMOEndpoint endpoint = null; 72 73 private InternalMessageListener listener; 74 77 protected AbstractConnector connector = null; 78 79 protected AtomicBoolean disposing = new AtomicBoolean(false); 80 81 protected WaitableBoolean connected = new WaitableBoolean(false); 82 83 protected WaitableBoolean stopped = new WaitableBoolean(true); 84 85 private AtomicBoolean connecting = new AtomicBoolean(false); 86 87 93 private UMOEndpointURI endpointUri; 94 95 private UMOWorkManager workManager; 96 97 protected ConnectionStrategy connectionStrategy; 98 99 114 public AbstractMessageReceiver(UMOConnector connector, UMOComponent component, UMOEndpoint endpoint) 115 throws InitialisationException 116 { 117 setConnector(connector); 118 setComponent(component); 119 setEndpoint(endpoint); 120 listener = new DefaultInternalMessageListener(); 121 endpointUri = endpoint.getEndpointURI(); 122 123 workManager = this.connector.createReceiverWorkManager(endpoint.getName()); 124 try 125 { 126 workManager.start(); 127 } 128 catch (UMOException e) 129 { 130 throw new InitialisationException(e, this); 131 } 132 133 connectionStrategy = this.connector.getConnectionStrategy(); 134 } 135 136 141 public UMOEndpoint getEndpoint() 142 { 143 return endpoint; 144 } 145 146 151 public void handleException(Exception exception) 152 { 153 if (exception instanceof ConnectException) 154 { 155 logger.info("Exception caught is a ConnectException, disconnecting receiver and invoking ReconnectStrategy"); 156 try 157 { 158 disconnect(); 159 } 160 catch (Exception e) 161 { 162 connector.getExceptionListener().exceptionThrown(e); 163 } 164 } 165 connector.getExceptionListener().exceptionThrown(exception); 166 if (exception instanceof ConnectException) 167 { 168 try 169 { 170 logger.warn("Reconnecting after exception: " + exception.getMessage(), exception); 171 connectionStrategy.connect(this); 172 } 173 catch (UMOException e) 174 { 175 connector.getExceptionListener().exceptionThrown(e); 176 } 177 } 178 } 179 180 187 protected void setExceptionDetails(UMOMessage message, Throwable exception) 188 { 189 String propName = ExceptionHelper.getErrorCodePropertyName(connector.getProtocol()); 190 if (propName != null) 193 { 194 String code = ExceptionHelper.getErrorMapping(connector.getProtocol(), exception.getClass()); 195 if (logger.isDebugEnabled()) 196 { 197 logger.debug("Setting error code for: " + connector.getProtocol() + ", " + propName + "=" 198 + code); 199 } 200 message.setProperty(propName, code); 201 } 202 } 203 204 public UMOConnector getConnector() 205 { 206 return connector; 207 } 208 209 public void setConnector(UMOConnector connector) 210 { 211 if (connector != null) 212 { 213 if (connector instanceof AbstractConnector) 214 { 215 this.connector = (AbstractConnector)connector; 216 } 217 else 218 { 219 throw new IllegalArgumentException (new Message( 220 Messages.PROPERTY_X_IS_NOT_SUPPORTED_TYPE_X_IT_IS_TYPE_X, "connector", 221 AbstractConnector.class.getName(), connector.getClass().getName()).getMessage()); 222 } 223 } 224 else 225 { 226 throw new NullPointerException (new Message(Messages.X_IS_NULL, "connector").getMessage()); 227 } 228 } 229 230 public UMOComponent getComponent() 231 { 232 return component; 233 } 234 235 public final UMOMessage routeMessage(UMOMessage message) throws UMOException 236 { 237 return routeMessage(message, (endpoint.isSynchronous() || TransactionCoordination.getInstance() 238 .getTransaction() != null)); 239 } 240 241 public final UMOMessage routeMessage(UMOMessage message, boolean synchronous) throws UMOException 242 { 243 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 244 return routeMessage(message, tx, tx != null || synchronous, null); 245 } 246 247 public final UMOMessage routeMessage(UMOMessage message, UMOTransaction trans, boolean synchronous) 248 throws UMOException 249 { 250 return routeMessage(message, trans, synchronous, null); 251 } 252 253 public final UMOMessage routeMessage(UMOMessage message, OutputStream outputStream) throws UMOException 254 { 255 return routeMessage(message, endpoint.isSynchronous(), outputStream); 256 } 257 258 public final UMOMessage routeMessage(UMOMessage message, boolean synchronous, OutputStream outputStream) 259 throws UMOException 260 { 261 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 262 return routeMessage(message, tx, tx != null || synchronous, outputStream); 263 } 264 265 public final UMOMessage routeMessage(UMOMessage message, 266 UMOTransaction trans, 267 boolean synchronous, 268 OutputStream outputStream) throws UMOException 269 { 270 271 if (connector.isEnableMessageEvents()) 272 { 273 connector.fireNotification(new MessageNotification(message, endpoint, component.getDescriptor() 274 .getName(), MessageNotification.MESSAGE_RECEIVED)); 275 } 276 277 if (logger.isDebugEnabled()) 278 { 279 logger.debug("Message Received from: " + endpoint.getEndpointURI()); 280 } 281 if (logger.isTraceEnabled()) 282 { 283 try 284 { 285 logger.trace("Message Payload: \n" 286 + StringMessageUtils.truncate(StringMessageUtils.toString(message.getPayload()), 287 200, false)); 288 } 289 catch (Exception e) 290 { 291 } 293 } 294 295 if (endpoint.getFilter() != null) 297 { 298 if (!endpoint.getFilter().accept(message)) 299 { 300 handleUnacceptedFilter(message); 301 return null; 302 } 303 } 304 return listener.onMessage(message, trans, synchronous, outputStream); 305 } 306 307 protected UMOMessage handleUnacceptedFilter(UMOMessage message) 308 { 309 String messageId = null; 310 messageId = message.getUniqueId(); 311 312 if (logger.isDebugEnabled()) 313 { 314 logger.debug("Message " + messageId + " failed to pass filter on endpoint: " + endpoint 315 + ". Message is being ignored"); 316 } 317 318 return null; 319 } 320 321 326 public void setEndpoint(UMOEndpoint endpoint) 327 { 328 if (endpoint == null) 329 { 330 throw new IllegalArgumentException ("Provider cannot be null"); 331 } 332 this.endpoint = endpoint; 333 } 334 335 340 public void setComponent(UMOComponent component) 341 { 342 if (component == null) 343 { 344 throw new IllegalArgumentException ("Component cannot be null"); 345 } 346 this.component = component; 347 } 348 349 public final void dispose() 350 { 351 stop(); 352 disposing.set(true); 353 doDispose(); 354 workManager.dispose(); 355 } 356 357 361 protected void doDispose() 362 { 363 } 365 366 public UMOEndpointURI getEndpointURI() 367 { 368 return endpointUri; 369 } 370 371 protected UMOWorkManager getWorkManager() 372 { 373 return workManager; 374 } 375 376 protected void setWorkManager(UMOWorkManager workManager) 377 { 378 this.workManager = workManager; 379 } 380 381 public void connect() throws Exception 382 { 383 if (connected.get()) 384 { 385 return; 386 } 387 if (logger.isDebugEnabled()) 388 { 389 logger.debug("Attempting to connect to: " + endpoint.getEndpointURI()); 390 } 391 if (connecting.compareAndSet(false, true)) 392 { 393 connectionStrategy.connect(this); 394 logger.info("Successfully connected to: " + endpoint.getEndpointURI()); 395 return; 396 } 397 398 try 399 { 400 doConnect(); 401 connector.fireNotification(new ConnectionNotification(this, getConnectEventId(), 402 ConnectionNotification.CONNECTION_CONNECTED)); 403 } 404 catch (Exception e) 405 { 406 connector.fireNotification(new ConnectionNotification(this, getConnectEventId(), 407 ConnectionNotification.CONNECTION_FAILED)); 408 if (e instanceof ConnectException) 409 { 410 throw (ConnectException)e; 411 } 412 else 413 { 414 throw new ConnectException(e, this); 415 } 416 } 417 connected.set(true); 418 connecting.set(false); 419 } 420 421 public void disconnect() throws Exception 422 { 423 if (logger.isDebugEnabled()) 424 { 425 logger.debug("Disconnecting from: " + endpoint.getEndpointURI()); 426 } 427 connector.fireNotification(new ConnectionNotification(this, getConnectEventId(), 428 ConnectionNotification.CONNECTION_DISCONNECTED)); 429 connected.set(false); 430 doDisconnect(); 431 logger.info("Disconnected from: " + endpoint.getEndpointURI()); 432 } 433 434 public String getConnectionDescription() 435 { 436 return endpoint.getEndpointURI().toString(); 437 } 438 439 public final void start() throws UMOException 440 { 441 if (stopped.commit(true, false)) 442 { 443 if (!connected.get()) 444 { 445 connectionStrategy.connect(this); 446 } 447 doStart(); 448 } 449 } 450 451 public final void stop() 452 { 453 try 454 { 455 if (connected.get()) 456 { 457 disconnect(); 458 } 459 } 460 catch (Exception e) 461 { 462 logger.error(e.getMessage(), e); 463 } 464 465 if (stopped.commit(false, true)) 466 { 467 try 468 { 469 doStop(); 470 } 471 catch (UMOException e) 472 { 473 logger.error(e.getMessage(), e); 474 } 475 476 } 477 } 478 479 public final boolean isConnected() 480 { 481 return connected.get(); 482 } 483 484 public InternalMessageListener getListener() 485 { 486 return listener; 487 } 488 489 public void setListener(InternalMessageListener listener) 490 { 491 this.listener = listener; 492 } 493 494 private class DefaultInternalMessageListener implements InternalMessageListener 495 { 496 497 public UMOMessage onMessage(UMOMessage message, 498 UMOTransaction trans, 499 boolean synchronous, 500 OutputStream outputStream) throws UMOException 501 { 502 503 UMOMessage resultMessage = null; 504 ResponseOutputStream ros = null; 505 if (outputStream != null) 506 { 507 if (outputStream instanceof ResponseOutputStream) 508 { 509 ros = (ResponseOutputStream)outputStream; 510 } 511 else 512 { 513 ros = new ResponseOutputStream(outputStream); 514 } 515 } 516 UMOSession session = new MuleSession(message, connector.getSessionHandler(), component); 517 UMOEvent muleEvent = new MuleEvent(message, endpoint, session, synchronous, ros); 518 RequestContext.setEvent(muleEvent); 519 520 boolean authorised = false; 522 if (endpoint.getSecurityFilter() != null) 523 { 524 try 525 { 526 endpoint.getSecurityFilter().authenticate(muleEvent); 527 authorised = true; 528 } 529 catch (SecurityException e) 530 { 531 logger.warn("Request was made but was not authenticated: " + e.getMessage(), e); 532 connector.fireNotification(new SecurityNotification(e, 533 SecurityNotification.SECURITY_AUTHENTICATION_FAILED)); 534 handleException(e); 535 resultMessage = message; 536 } 538 } 539 else 540 { 541 authorised = true; 542 } 543 544 if (authorised) 545 { 546 muleEvent = RequestContext.getEvent(); 549 550 if (UMOEndpoint.ENDPOINT_TYPE_RESPONSE.equals(endpoint.getType())) 552 { 553 component.getDescriptor().getResponseRouter().route(muleEvent); 554 return null; 555 } 556 else 557 { 558 resultMessage = component.getDescriptor().getInboundRouter().route(muleEvent); 559 } 560 } 561 if (resultMessage != null) 562 { 563 RequestContext.rewriteEvent(resultMessage); 564 if (resultMessage.getExceptionPayload() != null) 565 { 566 setExceptionDetails(resultMessage, resultMessage.getExceptionPayload().getException()); 567 } 568 } 569 return applyResponseTransformer(resultMessage); 570 } 571 } 572 573 protected String getConnectEventId() 574 { 575 return connector.getName() + ".receiver (" + endpoint.getEndpointURI() + ")"; 576 } 577 578 protected UMOMessage applyResponseTransformer(UMOMessage returnMessage) throws TransformerException 579 { 580 UMOTransformer transformer = endpoint.getResponseTransformer(); 581 582 if (transformer == null) 584 { 585 transformer = component.getDescriptor().getResponseTransformer(); 586 } 587 588 if (transformer == null) 590 { 591 return returnMessage; 592 } 593 594 if (returnMessage == null) 595 { 596 if (transformer.isAcceptNull()) 597 { 598 returnMessage = new MuleMessage(new NullPayload(), RequestContext.getEventContext() 599 .getMessage()); 600 } 601 else 602 { 603 return null; 604 } 605 } 606 607 Object returnPayload = returnMessage.getPayload(); 608 if (transformer.isSourceTypeSupported(returnPayload.getClass())) 609 { 610 Object result = transformer.transform(returnPayload); 611 if (result instanceof UMOMessage) 612 { 613 returnMessage = (UMOMessage)result; 614 } 615 else 616 { 617 returnMessage = new MuleMessage(result, returnMessage); 631 } 634 } 635 else 636 { 637 if (logger.isDebugEnabled()) 638 { 639 logger.debug("Response transformer: " + transformer + " doesn't support the result payload: " 640 + returnPayload.getClass()); 641 } 642 } 643 return returnMessage; 644 } 645 646 public void doStart() throws UMOException 647 { 648 } 650 651 public void doStop() throws UMOException 652 { 653 } 655 656 public abstract void doConnect() throws Exception ; 657 658 public abstract void doDisconnect() throws Exception ; 659 660 } 661 | Popular Tags |