1 10 11 package org.mule.impl.model.seda; 12 13 import org.mule.MuleManager; 14 import org.mule.MuleRuntimeException; 15 import org.mule.config.PoolingProfile; 16 import org.mule.config.ThreadingProfile; 17 import org.mule.config.i18n.Message; 18 import org.mule.config.i18n.Messages; 19 import org.mule.impl.FailedToQueueEventException; 20 import org.mule.impl.MuleDescriptor; 21 import org.mule.impl.MuleEvent; 22 import org.mule.impl.model.AbstractComponent; 23 import org.mule.impl.model.DefaultMuleProxy; 24 import org.mule.impl.model.MuleProxy; 25 import org.mule.umo.ComponentException; 26 import org.mule.umo.UMOEvent; 27 import org.mule.umo.UMOException; 28 import org.mule.umo.UMOMessage; 29 import org.mule.umo.lifecycle.InitialisationException; 30 import org.mule.umo.lifecycle.LifecycleException; 31 import org.mule.umo.manager.UMOWorkManager; 32 import org.mule.util.ObjectPool; 33 import org.mule.util.queue.QueueSession; 34 35 import javax.resource.spi.work.Work ; 36 import javax.resource.spi.work.WorkEvent ; 37 import javax.resource.spi.work.WorkException ; 38 import javax.resource.spi.work.WorkListener ; 39 import javax.resource.spi.work.WorkManager ; 40 import java.util.NoSuchElementException ; 41 42 50 public class SedaComponent extends AbstractComponent implements Work , WorkListener 51 { 52 55 private static final long serialVersionUID = 7711976708670893015L; 56 57 61 protected ObjectPool proxyPool = null; 62 63 68 protected MuleProxy componentProxy = null; 69 70 protected UMOWorkManager workManager; 71 72 protected String descriptorQueueName; 73 74 77 protected int queueTimeout = 0; 78 79 82 protected boolean enablePooling = true; 83 84 87 protected boolean componentPerRequest = false; 88 89 95 public SedaComponent(MuleDescriptor descriptor, SedaModel model) 96 { 97 super(descriptor, model); 98 descriptorQueueName = descriptor.getName() + ".component"; 99 queueTimeout = model.getQueueTimeout(); 100 enablePooling = model.isEnablePooling(); 101 componentPerRequest = model.isComponentPerRequest(); 102 } 103 104 113 public synchronized void doInitialise() throws InitialisationException 114 { 115 ThreadingProfile tp = descriptor.getThreadingProfile(); 117 workManager = tp.createWorkManager(descriptor.getName()); 118 try 119 { 120 descriptor.getQueueProfile().configureQueue(descriptor.getName()); 122 } 123 catch (InitialisationException e) 124 { 125 throw e; 126 } 127 catch (Throwable e) 128 { 129 throw new InitialisationException( 130 new Message(Messages.X_FAILED_TO_INITIALISE, "Component Queue"), e, this); 131 } 132 } 133 134 protected void initialisePool() throws InitialisationException 135 { 136 try 137 { 138 proxyPool = descriptor.getPoolingProfile().getPoolFactory().createPool(descriptor); 140 141 if (descriptor.getPoolingProfile().getInitialisationPolicy() == PoolingProfile.POOL_INITIALISE_ALL_COMPONENTS) 142 { 143 int threads = descriptor.getPoolingProfile().getMaxActive(); 144 for (int i = 0; i < threads; i++) 145 { 146 proxyPool.returnObject(proxyPool.borrowObject()); 147 } 148 } 149 else if (descriptor.getPoolingProfile().getInitialisationPolicy() == PoolingProfile.POOL_INITIALISE_ONE_COMPONENT) 150 { 151 proxyPool.returnObject(proxyPool.borrowObject()); 152 } 153 154 poolInitialised.set(true); 155 } 156 catch (Exception e) 157 { 158 throw new InitialisationException(new Message(Messages.X_FAILED_TO_INITIALISE, "Proxy Pool"), e, 159 this); 160 } 161 } 162 163 protected MuleProxy createComponentProxy() throws InitialisationException 164 { 165 try 166 { 167 Object component = lookupComponent(); 168 MuleProxy componentProxy = new DefaultMuleProxy(component, descriptor, null); 169 getStatistics().setComponentPoolSize(-1); 170 componentProxy.setStatistics(getStatistics()); 171 componentProxy.start(); 172 return componentProxy; 173 } 174 catch (UMOException e) 175 { 176 throw new InitialisationException(e, this); 177 } 178 } 179 180 public void doForceStop() throws UMOException 181 { 182 doStop(); 183 } 184 185 public void doStop() throws UMOException 186 { 187 workManager.stop(); 188 if (proxyPool != null) 189 { 190 try 191 { 192 proxyPool.stop(); 193 proxyPool.clearPool(); 194 } 195 catch (Exception e) 196 { 197 logger.error("Failed to stop component pool: " + e.getMessage(), e); 198 } 199 poolInitialised.set(false); 200 } 201 else if (componentProxy != null) 202 { 203 componentProxy.stop(); 204 } 205 } 206 207 public void doStart() throws UMOException 208 { 209 210 try 211 { 212 if (!poolInitialised.get() && enablePooling) 215 { 216 initialisePool(); 217 proxyPool.start(); 218 } 219 else if (!componentPerRequest) 220 { 221 componentProxy = createComponentProxy(); 222 } 223 workManager.start(); 224 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, this); 225 } 226 catch (Exception e) 227 { 228 throw new LifecycleException(new Message(Messages.FAILED_TO_START_X, "Component: " 229 + descriptor.getName()), e, 230 this); 231 } 232 } 233 234 protected void doDispose() 235 { 236 237 try 238 { 239 if (workManager != null) 241 { 242 workManager.dispose(); 243 } 244 } 245 catch (Exception e) 246 { 247 logger.error("Component Thread Pool did not close properly: " + e); 248 } 249 try 250 { 251 if (proxyPool != null) 252 { 253 proxyPool.clearPool(); 254 } 255 else if (componentProxy != null) 256 { 257 componentProxy.dispose(); 258 } 259 } 260 catch (Exception e) 261 { 262 logger.error("Proxy Pool did not close properly: " + e); 263 } 264 } 265 266 protected void doDispatch(UMOEvent event) throws UMOException 267 { 268 if (stats.isEnabled()) 270 { 271 stats.incReceivedEventASync(); 272 } 273 if (logger.isDebugEnabled()) 274 { 275 logger.debug("Component: " + descriptor.getName() + " has received asynchronous event on: " 276 + event.getEndpoint().getEndpointURI()); 277 } 278 279 try 281 { 282 enqueue(event); 283 if (stats.isEnabled()) 284 { 285 stats.incQueuedEvent(); 286 } 287 } 288 catch (Exception e) 289 { 290 FailedToQueueEventException e1 = new FailedToQueueEventException(new Message( 291 Messages.INTERRUPTED_QUEUING_EVENT_FOR_X, getName()), event.getMessage(), this, e); 292 handleException(e1); 293 } 294 295 if (logger.isTraceEnabled()) 296 { 297 logger.trace("Event added to queue for: " + descriptor.getName()); 298 } 299 } 300 301 public UMOMessage doSend(UMOEvent event) throws UMOException 302 { 303 304 UMOMessage result = null; 305 MuleProxy proxy = null; 306 try 307 { 308 if (proxyPool != null) 309 { 310 proxy = (MuleProxy)proxyPool.borrowObject(); 311 getStatistics().setComponentPoolSize(proxyPool.getSize()); 312 } 313 else if (componentPerRequest) 314 { 315 proxy = createComponentProxy(); 316 } 317 else 318 { 319 proxy = componentProxy; 320 } 321 322 proxy.setStatistics(getStatistics()); 323 324 if (logger.isDebugEnabled()) 325 { 326 logger.debug(this + " : got proxy for " + event.getId() + " = " + proxy); 327 } 328 result = (UMOMessage)proxy.onCall(event); 329 } 330 catch (UMOException e) 331 { 332 throw e; 333 } 334 catch (Exception e) 335 { 336 throw new ComponentException(event.getMessage(), this, e); 337 } 338 finally 339 { 340 try 341 { 342 if (proxy != null) 343 { 344 if (proxyPool != null) 345 { 346 proxyPool.returnObject(proxy); 347 } 348 else if (componentPerRequest) 349 { 350 proxy.dispose(); 351 } 352 } 353 } 354 catch (Exception e) 355 { 356 throw new ComponentException(event.getMessage(), this, e); 358 } 359 360 if (proxyPool != null) 361 { 362 getStatistics().setComponentPoolSize(proxyPool.getSize()); 363 } 364 } 365 return result; 366 } 367 368 371 ObjectPool getProxyPool() 372 { 373 return proxyPool; 374 } 375 376 public int getQueueSize() 377 { 378 QueueSession queueSession = MuleManager.getInstance().getQueueManager().getQueueSession(); 379 return queueSession.getQueue(descriptor.getName()).size(); 380 } 381 382 386 public void run() 387 { 388 MuleEvent event = null; 389 MuleProxy proxy = null; 390 QueueSession queueSession = MuleManager.getInstance().getQueueManager().getQueueSession(); 391 392 while (!stopped.get()) 393 { 394 try 395 { 396 paused.whenFalse(null); 398 399 if (stopping.get()) 402 { 403 if (queueSession == null || queueSession.getQueue(descriptorQueueName).size() == 0) 404 { 405 stopping.set(false); 406 break; 407 } 408 } 409 410 event = (MuleEvent)dequeue(); 411 if (event != null) 412 { 413 if (stats.isEnabled()) 414 { 415 stats.decQueuedEvent(); 416 } 417 418 if (logger.isDebugEnabled()) 419 { 420 logger.debug("Component: " + descriptor.getName() + " dequeued event on: " 421 + event.getEndpoint().getEndpointURI()); 422 } 423 424 if (proxyPool != null) 425 { 426 proxy = (MuleProxy)proxyPool.borrowObject(); 427 getStatistics().setComponentPoolSize(proxyPool.getSize()); 428 } 429 else if (componentPerRequest) 430 { 431 proxy = createComponentProxy(); 432 } 433 else 434 { 435 proxy = componentProxy; 436 } 437 438 proxy.setStatistics(getStatistics()); 439 proxy.start(); 440 proxy.onEvent(queueSession, event); 441 workManager.scheduleWork(proxy, WorkManager.INDEFINITE, null, this); 442 } 443 } 444 catch (Exception e) 445 { 446 if (proxy != null && proxyPool != null) 447 { 448 try 449 { 450 proxyPool.returnObject(proxy); 451 } 452 catch (Exception e2) 453 { 454 logger.info("Failed to return proxy to pool", e2); 455 } 456 } 457 458 if (e instanceof InterruptedException ) 459 { 460 stopping.set(false); 461 break; 462 } 463 else if (e instanceof NoSuchElementException ) 464 { 465 handleException(new ComponentException(new Message(Messages.PROXY_POOL_TIMED_OUT), 466 (event == null ? null : event.getMessage()), this, e)); 467 } 468 else if (e instanceof UMOException) 469 { 470 handleException(e); 471 } 472 else if (e instanceof WorkException ) 473 { 474 handleException(new ComponentException(new Message( 475 Messages.EVENT_PROCESSING_FAILED_FOR_X, descriptor.getName()), (event == null 476 ? null : event.getMessage()), this, e)); 477 } 478 else 479 { 480 handleException(new ComponentException(new Message(Messages.FAILED_TO_GET_POOLED_OBJECT), 481 (event == null ? null : event.getMessage()), this, e)); 482 } 483 } 484 finally 485 { 486 stopping.set(false); 487 if (proxy != null && componentPerRequest) 488 { 489 proxy.dispose(); 490 } 491 } 492 } 493 } 494 495 public void release() 496 { 497 stopping.set(false); 498 } 499 500 protected void enqueue(UMOEvent event) throws Exception 501 { 502 QueueSession session = MuleManager.getInstance().getQueueManager().getQueueSession(); 503 session.getQueue(descriptorQueueName).put(event); 504 } 505 506 protected UMOEvent dequeue() throws Exception 507 { 508 QueueSession queueSession = MuleManager.getInstance().getQueueManager().getQueueSession(); 510 return (UMOEvent)queueSession.getQueue(descriptorQueueName).poll(queueTimeout); 511 } 512 513 public void workAccepted(WorkEvent event) 514 { 515 handleWorkException(event, "workAccepted"); 516 } 517 518 public void workRejected(WorkEvent event) 519 { 520 handleWorkException(event, "workRejected"); 521 } 522 523 public void workStarted(WorkEvent event) 524 { 525 handleWorkException(event, "workStarted"); 526 } 527 528 public void workCompleted(WorkEvent event) 529 { 530 handleWorkException(event, "workCompleted"); 531 } 532 533 protected void handleWorkException(WorkEvent event, String type) 534 { 535 Throwable e; 536 537 if (event != null && event.getException() != null) 538 { 539 e = event.getException(); 540 } 541 else 542 { 543 return; 544 } 545 546 if (event.getException().getCause() != null) 547 { 548 e = event.getException().getCause(); 549 } 550 551 logger.error("Work caused exception on '" + type + "'. Work being executed was: " 552 + event.getWork().toString()); 553 554 if (e instanceof Exception ) 555 { 556 handleException((Exception )e); 557 } 558 else 559 { 560 throw new MuleRuntimeException(new Message(Messages.COMPONENT_CAUSED_ERROR_IS_X, getName()), e); 561 } 562 } 563 564 } 565 | Popular Tags |