1 10 11 package org.mule.impl.model.seda.optimised; 12 13 import org.apache.commons.logging.Log; 14 import org.apache.commons.logging.LogFactory; 15 import org.mule.config.i18n.Message; 16 import org.mule.config.i18n.Messages; 17 import org.mule.impl.ImmutableMuleDescriptor; 18 import org.mule.impl.MuleDescriptor; 19 import org.mule.impl.MuleMessage; 20 import org.mule.impl.RequestContext; 21 import org.mule.impl.model.MuleProxy; 22 import org.mule.management.stats.ComponentStatistics; 23 import org.mule.umo.MessagingException; 24 import org.mule.umo.UMOEvent; 25 import org.mule.umo.UMOEventContext; 26 import org.mule.umo.UMOException; 27 import org.mule.umo.UMOImmutableDescriptor; 28 import org.mule.umo.UMOMessage; 29 import org.mule.umo.lifecycle.Callable; 30 import org.mule.umo.lifecycle.Disposable; 31 import org.mule.umo.lifecycle.Startable; 32 import org.mule.umo.lifecycle.Stoppable; 33 import org.mule.umo.model.ModelException; 34 import org.mule.umo.provider.UMOMessageDispatcher; 35 import org.mule.util.ObjectPool; 36 import org.mule.util.queue.QueueSession; 37 38 45 46 public class OptimisedMuleProxy implements MuleProxy 47 { 48 51 private static Log logger = LogFactory.getLog(OptimisedMuleProxy.class); 52 53 56 private UMOEvent event; 57 58 61 private ImmutableMuleDescriptor descriptor; 62 63 66 private boolean suspended = true; 67 68 private ObjectPool proxyPool; 69 70 private ComponentStatistics stat = null; 71 72 75 private Callable umo; 76 77 private boolean started = false; 78 private boolean disposed = false; 79 80 87 public OptimisedMuleProxy(Callable component, MuleDescriptor descriptor, ObjectPool proxyPool) 88 throws UMOException 89 { 90 this.descriptor = new ImmutableMuleDescriptor(descriptor); 91 this.proxyPool = proxyPool; 92 umo = component; 93 } 94 95 public void start() throws UMOException 96 { 97 checkDisposed(); 98 if (!started && umo instanceof Startable) 99 { 100 try 101 { 102 ((Startable)umo).start(); 103 started = true; 104 } 105 catch (Exception e) 106 { 107 throw new ModelException( 108 new Message(Messages.FAILED_TO_START_X, "Component '" + descriptor.getName() + "'"), e); 109 } 110 } 111 112 } 113 114 public boolean isStarted() 115 { 116 return started; 117 } 118 119 public void stop() throws UMOException 120 { 121 checkDisposed(); 122 123 if (started && umo instanceof Stoppable) 124 { 125 started = false; 126 try 127 { 128 ((Stoppable)umo).stop(); 129 } 130 catch (Exception e) 131 { 132 throw new ModelException( 133 new Message(Messages.FAILED_TO_STOP_X, "Component '" + descriptor.getName() + "'"), e); 134 } 135 } 136 } 137 138 public void dispose() 139 { 140 checkDisposed(); 141 if (umo instanceof Disposable) 142 { 143 ((Disposable)umo).dispose(); 144 disposed = true; 145 } 146 } 147 148 private void checkDisposed() 149 { 150 if (disposed) 151 { 152 throw new IllegalStateException ("Components Disposed Of"); 153 } 154 } 155 156 161 public void onEvent(QueueSession session, UMOEvent event) 162 { 163 this.event = event; 165 } 166 167 public ComponentStatistics getStatistics() 168 { 169 return stat; 170 } 171 172 public void setStatistics(ComponentStatistics stat) 173 { 174 this.stat = stat; 175 } 176 177 184 public Object onCall(UMOEvent event) throws UMOException 185 { 186 if (logger.isTraceEnabled()) 187 { 188 logger.trace("MuleProxy: sync call for Mule UMO " + descriptor.getName()); 189 } 190 191 UMOMessage returnMessage = null; 192 try 193 { 194 if (event.getEndpoint().canReceive()) 195 { 196 204 long startTime = 0; 206 if (stat.isEnabled()) 207 { 208 startTime = System.currentTimeMillis(); 209 } 210 returnMessage = invokeUmo(RequestContext.getEventContext()); 211 if (stat.isEnabled()) 213 { 214 stat.addExecutionTime(System.currentTimeMillis() - startTime); 215 } 216 event = RequestContext.getEvent(); 218 if (event.isStopFurtherProcessing()) 219 { 220 logger.debug("Event stop further processing has been set, no outbound routing will be performed."); 221 } 222 if (returnMessage != null && !event.isStopFurtherProcessing()) 223 { 224 if (descriptor.getOutboundRouter().hasEndpoints()) 229 { 230 UMOMessage outboundReturnMessage = descriptor.getOutboundRouter().route( 231 returnMessage, event.getSession(), event.isSynchronous()); 232 if (outboundReturnMessage != null) 233 { 234 returnMessage = outboundReturnMessage; 235 } 236 } 237 else 238 { 239 logger.debug("Outbound router on component '" + descriptor.getName() 240 + "' doesn't have any endpoints configured."); 241 } 242 } 243 244 262 } 263 else 264 { 265 returnMessage = event.getSession().sendEvent(event); 266 } 268 269 if (stat.isEnabled()) 271 { 272 stat.incSentEventSync(); 273 } 274 } 275 catch (Exception e) 276 { 277 event.getSession().setValid(false); 278 if (e instanceof UMOException) 279 { 280 handleException(e); 281 } 282 else 283 { 284 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X, 285 descriptor.getName()), event.getMessage(), e)); 286 } 287 } 288 return returnMessage; 289 } 290 291 protected UMOMessage invokeUmo(UMOEventContext context) throws Exception 292 { 293 Object result = umo.onCall(RequestContext.getEventContext()); 294 if (result != null) 295 { 296 if (result instanceof UMOMessage) 297 { 298 return (UMOMessage)result; 299 } 300 else 301 { 302 return new MuleMessage(result, context.getMessage()); 303 } 304 } 305 return null; 306 } 307 308 314 public void handleException(Exception exception) 315 { 316 descriptor.getExceptionListener().exceptionThrown(exception); 317 } 318 319 324 public String toString() 325 { 326 return "optimised proxy for: " + descriptor.toString(); 327 } 328 329 334 public boolean isSuspended() 335 { 336 return suspended; 337 } 338 339 342 public void suspend() 343 { 344 suspended = true; 345 } 346 347 350 public void resume() 351 { 352 suspended = false; 353 } 354 355 382 387 public void run() 388 { 389 if (logger.isTraceEnabled()) 390 { 391 logger.trace("MuleProxy: async onEvent for Mule UMO " + descriptor.getName()); 392 } 393 394 try 395 { 396 if (event.getEndpoint().canReceive()) 397 { 398 RequestContext.setEvent(event); 400 410 long startTime = 0; 412 if (stat.isEnabled()) 413 { 414 startTime = System.currentTimeMillis(); 415 } 416 UMOMessage result = invokeUmo(RequestContext.getEventContext()); 417 if (stat.isEnabled()) 418 { 419 stat.addExecutionTime(System.currentTimeMillis() - startTime); 420 } 421 event = RequestContext.getEvent(); 423 if (result != null && !event.isStopFurtherProcessing()) 424 { 425 descriptor.getOutboundRouter().route(result, event.getSession(), event.isSynchronous()); 426 } 427 428 } 438 else 439 { 440 UMOMessageDispatcher dispatcher = event.getEndpoint().getConnector().getDispatcher( 441 event.getEndpoint()); 442 dispatcher.dispatch(event); 443 } 444 445 if (stat.isEnabled()) 446 { 447 stat.incSentEventASync(); 448 } 449 } 450 catch (Exception e) 451 { 452 event.getSession().setValid(false); 453 if (e instanceof UMOException) 454 { 455 handleException(e); 456 } 457 else 458 { 459 handleException(new MessagingException(new Message(Messages.EVENT_PROCESSING_FAILED_FOR_X, 460 descriptor.getName()), event.getMessage(), e)); 461 } 462 } 463 finally 464 { 465 466 try 467 { 468 proxyPool.returnObject(this); 469 } 470 catch (Exception e2) 471 { 472 logger.error("Failed to return proxy: " + e2.getMessage(), e2); 473 } 474 getStatistics().setComponentPoolSize(proxyPool.getSize()); 475 } 476 } 477 478 public void release() 479 { 480 } 482 483 488 public UMOImmutableDescriptor getDescriptor() 489 { 490 return descriptor; 491 } 492 } 493 | Popular Tags |