1 10 11 package org.mule.impl.model; 12 13 import org.apache.commons.logging.Log; 14 import org.apache.commons.logging.LogFactory; 15 import org.mule.MuleManager; 16 import org.mule.config.MuleProperties; 17 import org.mule.config.i18n.Message; 18 import org.mule.config.i18n.Messages; 19 import org.mule.impl.ImmutableMuleDescriptor; 20 import org.mule.impl.InterceptorsInvoker; 21 import org.mule.impl.MuleDescriptor; 22 import org.mule.impl.MuleEvent; 23 import org.mule.impl.MuleMessage; 24 import org.mule.impl.RequestContext; 25 import org.mule.impl.endpoint.MuleEndpoint; 26 import org.mule.impl.endpoint.MuleEndpointURI; 27 import org.mule.impl.message.ExceptionPayload; 28 import org.mule.management.stats.ComponentStatistics; 29 import org.mule.providers.AbstractConnector; 30 import org.mule.providers.NullPayload; 31 import org.mule.providers.ReplyToHandler; 32 import org.mule.umo.MessagingException; 33 import org.mule.umo.UMOEvent; 34 import org.mule.umo.UMOException; 35 import org.mule.umo.UMOExceptionPayload; 36 import org.mule.umo.UMOImmutableDescriptor; 37 import org.mule.umo.UMOInterceptor; 38 import org.mule.umo.UMOMessage; 39 import org.mule.umo.endpoint.UMOEndpoint; 40 import org.mule.umo.endpoint.UMOEndpointURI; 41 import org.mule.umo.endpoint.UMOImmutableEndpoint; 42 import org.mule.umo.lifecycle.Disposable; 43 import org.mule.umo.lifecycle.Initialisable; 44 import org.mule.umo.lifecycle.UMOLifecycleAdapter; 45 import org.mule.umo.model.ModelException; 46 import org.mule.umo.model.UMOEntryPointResolver; 47 import org.mule.umo.model.UMOModel; 48 import org.mule.umo.provider.UMOMessageDispatcher; 49 import org.mule.util.ObjectPool; 50 import org.mule.util.queue.QueueSession; 51 52 import java.util.ArrayList ; 53 import java.util.Iterator ; 54 import java.util.List ; 55 import java.util.Map ; 56 57 61 62 public class DefaultMuleProxy implements MuleProxy 63 { 64 67 private static Log logger = LogFactory.getLog(DefaultMuleProxy.class); 68 69 72 private UMOEvent event; 73 74 77 private UMOLifecycleAdapter umo; 78 79 82 private ImmutableMuleDescriptor descriptor; 83 84 87 private boolean suspended = true; 88 89 private List interceptorList; 90 91 private ObjectPool proxyPool; 92 93 private ComponentStatistics stat = null; 94 95 private QueueSession queueSession = null; 96 97 104 public DefaultMuleProxy(Object component, MuleDescriptor descriptor, ObjectPool proxyPool) 105 throws UMOException 106 { 107 this.descriptor = new ImmutableMuleDescriptor(descriptor); 108 this.proxyPool = proxyPool; 109 110 UMOModel model = MuleManager.getInstance().getModel(); 111 112 UMOEntryPointResolver resolver = model.getEntryPointResolver(); 113 umo = model.getLifecycleAdapterFactory().create(component, descriptor, resolver); 114 115 interceptorList = new ArrayList (descriptor.getInterceptors().size() + 1); 116 interceptorList.addAll(descriptor.getInterceptors()); 117 interceptorList.add(umo); 118 119 for (Iterator iter = interceptorList.iterator(); iter.hasNext();) 120 { 121 UMOInterceptor interceptor = (UMOInterceptor)iter.next(); 122 if (interceptor instanceof Initialisable) 123 { 124 try 125 { 126 ((Initialisable)interceptor).initialise(); 127 } 128 catch (Exception e) 129 { 130 throw new ModelException(new Message(Messages.X_FAILED_TO_INITIALISE, 131 "Component '" + descriptor.getName() + "'"), e); 132 } 133 } 134 } 135 } 136 137 public void start() throws UMOException 138 { 139 checkDisposed(); 140 if (!umo.isStarted()) 141 { 142 try 143 { 144 umo.start(); 145 } 146 catch (Exception e) 147 { 148 throw new ModelException( 149 new Message(Messages.FAILED_TO_START_X, "Component '" + descriptor.getName() + "'"), e); 150 } 151 } 152 153 } 154 155 public boolean isStarted() 156 { 157 return umo.isStarted(); 158 } 159 160 public void stop() throws UMOException 161 { 162 checkDisposed(); 163 if (umo.isStarted()) 164 { 165 try 166 { 167 umo.stop(); 168 } 169 catch (Exception e) 170 { 171 throw new ModelException( 172 new Message(Messages.FAILED_TO_STOP_X, "Component '" + descriptor.getName() + "'"), e); 173 } 174 } 175 } 176 177 public void dispose() 178 { 179 checkDisposed(); 180 for (Iterator iter = interceptorList.iterator(); iter.hasNext();) 181 { 182 UMOInterceptor interceptor = (UMOInterceptor)iter.next(); 183 if (interceptor instanceof Disposable) 184 { 185 try 186 { 187 ((Disposable)interceptor).dispose(); 188 } 189 catch (Exception e) 190 { 191 logger.error(new Message(Messages.FAILED_TO_DISPOSE_X, "Component '" 192 + descriptor.getName() + "'"), e); 193 } 194 } 195 } 196 } 197 198 private void checkDisposed() 199 { 200 if (umo.isDisposed()) 201 { 202 throw new IllegalStateException ("Component has already been disposed of"); 203 } 204 } 205 206 211 public void onEvent(QueueSession session, UMOEvent event) 212 { 213 this.queueSession = session; 214 this.event = event; 215 } 216 217 public ComponentStatistics getStatistics() 218 { 219 return stat; 220 } 221 222 public void setStatistics(ComponentStatistics stat) 223 { 224 this.stat = stat; 225 } 226 227 234 public Object onCall(UMOEvent event) throws UMOException 235 { 236 if (logger.isTraceEnabled()) 237 { 238 logger.trace("MuleProxy: sync call for Mule UMO " + descriptor.getName()); 239 } 240 241 UMOMessage returnMessage = null; 242 try 243 { 244 if (event.getEndpoint().canReceive()) 245 { 246 RequestContext.setEvent(event); 247 Object replyTo = event.getMessage().getReplyTo(); 248 ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(), event.getEndpoint()); 249 InterceptorsInvoker invoker = new InterceptorsInvoker(interceptorList, descriptor, 250 event.getMessage()); 251 252 long startTime = 0; 254 if (stat.isEnabled()) 255 { 256 startTime = System.currentTimeMillis(); 257 } 258 returnMessage = invoker.execute(); 259 260 if (stat.isEnabled()) 262 { 263 stat.addExecutionTime(System.currentTimeMillis() - startTime); 264 } 265 event = RequestContext.getEvent(); 267 if (event.isStopFurtherProcessing()) 268 { 269 logger.debug("Event stop further processing has been set, no outbound routing will be performed."); 270 } 271 if (returnMessage != null && !event.isStopFurtherProcessing()) 272 { 273 if (descriptor.getOutboundRouter().hasEndpoints()) 274 { 275 UMOMessage outboundReturnMessage = descriptor.getOutboundRouter().route( 276 returnMessage, event.getSession(), event.isSynchronous()); 277 if (outboundReturnMessage != null) 278 { 279 returnMessage = outboundReturnMessage; 280 } 281 } 282 else 283 { 284 logger.debug("Outbound router on component '" + descriptor.getName() 285 + "' doesn't have any endpoints configured."); 286 } 287 } 288 289 if (returnMessage != null && descriptor.getResponseRouter() != null) 291 { 292 logger.debug("Waiting for response router message"); 293 returnMessage = descriptor.getResponseRouter().getResponse(returnMessage); 294 } 295 296 if (returnMessage != null && replyToHandler != null) 298 { 299 String requestor = (String )returnMessage.getProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY); 300 if ((requestor != null && !requestor.equals(descriptor.getName())) || requestor == null) 301 { 302 replyToHandler.processReplyTo(event, returnMessage, replyTo); 303 } 304 } 305 306 } 307 else 308 { 309 returnMessage = event.getSession().sendEvent(event); 310 processReplyTo(returnMessage); 311 } 312 313 if (stat.isEnabled()) 315 { 316 stat.incSentEventSync(); 317 } 318 } 319 catch (Exception e) 320 { 321 event.getSession().setValid(false); 322 if (e instanceof MessagingException) 323 { 324 handleException(e); 325 } 326 else 327 { 328 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X, 329 descriptor.getName()), event.getMessage(), e)); 330 } 331 332 if (returnMessage == null) 333 { 334 returnMessage = new MuleMessage(new NullPayload(), (Map )null); 335 } 336 UMOExceptionPayload exceptionPayload = RequestContext.getExceptionPayload(); 337 if (exceptionPayload == null) 338 { 339 exceptionPayload = new ExceptionPayload(e); 340 } 341 returnMessage.setExceptionPayload(exceptionPayload); 342 } 343 return returnMessage; 344 } 345 346 352 public void handleException(Exception exception) 353 { 354 descriptor.getExceptionListener().exceptionThrown(exception); 355 } 356 357 362 public String toString() 363 { 364 return "proxy for: " + descriptor.toString(); 365 } 366 367 372 public boolean isSuspended() 373 { 374 return suspended; 375 } 376 377 380 public void suspend() 381 { 382 suspended = true; 383 } 384 385 388 public void resume() 389 { 390 suspended = false; 391 } 392 393 protected ReplyToHandler getReplyToHandler(UMOMessage message, UMOImmutableEndpoint endpoint) 394 { 395 Object replyTo = message.getReplyTo(); 396 ReplyToHandler replyToHandler = null; 397 if (replyTo != null) 398 { 399 replyToHandler = ((AbstractConnector)endpoint.getConnector()).getReplyToHandler(); 400 if (endpoint.getResponseTransformer() != null) 402 { 403 replyToHandler.setTransformer(endpoint.getResponseTransformer()); 404 } 405 } 406 return replyToHandler; 407 } 408 409 private void processReplyTo(UMOMessage returnMessage) throws UMOException 410 { 411 if (returnMessage != null && returnMessage.getReplyTo() != null) 412 { 413 if (logger.isDebugEnabled()) 414 { 415 logger.debug("sending reply to: " + returnMessage.getReplyTo()); 416 } 417 418 UMOEndpointURI endpointUri = new MuleEndpointURI(returnMessage.getReplyTo().toString()); 419 420 UMOEndpoint endpoint = MuleEndpoint.getOrCreateEndpointForUri(endpointUri, 422 UMOEndpoint.ENDPOINT_TYPE_SENDER); 423 424 returnMessage.removeProperty(MuleProperties.MULE_REPLY_TO_PROPERTY); 427 428 UMOEvent replyToEvent = new MuleEvent(returnMessage, endpoint, event.getSession(), false); 430 431 onEvent(queueSession, replyToEvent); 433 434 if (logger.isDebugEnabled()) 435 { 436 logger.debug("reply to sent: " + returnMessage.getReplyTo()); 437 } 438 439 if (stat.isEnabled()) 440 { 441 stat.incSentReplyToEvent(); 442 } 443 } 444 } 445 446 451 public void run() 452 { 453 if (logger.isTraceEnabled()) 454 { 455 logger.trace("MuleProxy: async onEvent for Mule UMO " + descriptor.getName()); 456 } 457 458 try 459 { 460 if (event.getEndpoint().canReceive()) 461 { 462 RequestContext.setEvent(event); 464 Object replyTo = event.getMessage().getReplyTo(); 465 ReplyToHandler replyToHandler = getReplyToHandler(event.getMessage(), event.getEndpoint()); 466 InterceptorsInvoker invoker = new InterceptorsInvoker(interceptorList, descriptor, 467 event.getMessage()); 468 469 long startTime = 0; 471 if (stat.isEnabled()) 472 { 473 startTime = System.currentTimeMillis(); 474 } 475 UMOMessage result = invoker.execute(); 476 if (stat.isEnabled()) 477 { 478 stat.addExecutionTime(System.currentTimeMillis() - startTime); 479 } 480 event = RequestContext.getEvent(); 482 if (result != null && !event.isStopFurtherProcessing()) 483 { 484 descriptor.getOutboundRouter().route(result, event.getSession(), event.isSynchronous()); 485 } 486 487 if (result != null && replyToHandler != null) 489 { 490 String requestor = (String )result.getProperty(MuleProperties.MULE_REPLY_TO_REQUESTOR_PROPERTY); 491 if ((requestor != null && !requestor.equals(descriptor.getName())) || requestor == null) 492 { 493 replyToHandler.processReplyTo(event, result, replyTo); 494 } 495 } 496 } 497 else 498 { 499 UMOMessageDispatcher dispatcher = event.getEndpoint().getConnector().getDispatcher( 500 event.getEndpoint()); 501 dispatcher.dispatch(event); 502 } 503 504 if (stat.isEnabled()) 505 { 506 stat.incSentEventASync(); 507 } 508 } 509 catch (Exception e) 510 { 511 event.getSession().setValid(false); 512 if (e instanceof MessagingException) 513 { 514 handleException(e); 515 } 516 else 517 { 518 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X, 519 descriptor.getName()), event.getMessage(), e)); 520 } 521 } 522 finally 523 { 524 try 525 { 526 proxyPool.returnObject(this); 527 } 528 catch (Exception e2) 529 { 530 logger.error("Failed to return proxy: " + e2.getMessage(), e2); 531 } 532 getStatistics().setComponentPoolSize(proxyPool.getSize()); 533 } 534 } 535 536 public void release() 537 { 538 } 540 541 546 public UMOImmutableDescriptor getDescriptor() 547 { 548 return descriptor; 549 } 550 } 551 | Popular Tags |