1 10 11 package org.mule.providers.jbi.components; 12 13 import org.apache.commons.logging.Log; 14 import org.apache.commons.logging.LogFactory; 15 import org.mule.MuleManager; 16 import org.mule.config.converters.QNameConverter; 17 import org.mule.util.concurrent.WaitableBoolean; 18 import org.w3c.dom.Document ; 19 import org.w3c.dom.DocumentFragment ; 20 21 import javax.jbi.JBIException; 22 import javax.jbi.component.Component; 23 import javax.jbi.component.ComponentContext; 24 import javax.jbi.component.ComponentLifeCycle; 25 import javax.jbi.component.ServiceUnitManager; 26 import javax.jbi.management.DeploymentException; 27 import javax.jbi.messaging.DeliveryChannel; 28 import javax.jbi.messaging.ExchangeStatus; 29 import javax.jbi.messaging.Fault; 30 import javax.jbi.messaging.MessageExchange; 31 import javax.jbi.messaging.MessageExchangeFactory; 32 import javax.jbi.messaging.MessagingException; 33 import javax.jbi.servicedesc.ServiceEndpoint; 34 import javax.management.MBeanServer ; 35 import javax.management.ObjectName ; 36 import javax.resource.spi.work.Work ; 37 import javax.resource.spi.work.WorkException ; 38 import javax.resource.spi.work.WorkManager ; 39 import javax.xml.namespace.QName ; 40 41 import java.util.HashMap ; 42 import java.util.Map ; 43 44 51 public abstract class AbstractJbiComponent implements Component, Work , ComponentLifeCycle 52 { 53 54 public static final String IN = "in"; 55 public static final String OUT = "out"; 56 59 protected transient Log logger = LogFactory.getLog(getClass()); 60 61 protected ComponentContext context; 62 63 protected Map serviceDescriptions = new HashMap (); 64 65 protected QName service; 66 67 protected String name; 68 69 protected WorkManager workManager; 70 71 protected DeliveryChannel deliveryChannel; 72 73 protected ObjectName mbeanName; 74 75 protected ServiceUnitManager serviceUnitManager; 76 77 protected MessageExchangeFactory exchangeFactory; 78 79 protected WaitableBoolean started = new WaitableBoolean(false); 80 81 public ComponentLifeCycle getLifeCycle() 82 { 83 return this; 84 } 85 86 public ServiceUnitManager getServiceUnitManager() 87 { 88 return serviceUnitManager; 89 } 90 91 public Document getServiceDescription(ServiceEndpoint endpoint) 92 { 93 if (logger.isDebugEnabled()) 94 { 95 logger.debug("Querying service description for " + endpoint); 96 } 97 String key = getKey(endpoint); 98 Document doc = (Document )this.serviceDescriptions.get(key); 99 if (logger.isDebugEnabled()) 100 { 101 if (doc != null) 102 { 103 logger.debug("Description found"); 104 } 105 else 106 { 107 logger.debug("Description not found"); 108 } 109 } 110 return doc; 111 } 112 113 public void setServiceDescription(ServiceEndpoint endpoint, Document doc) 114 { 115 if (logger.isDebugEnabled()) 116 { 117 logger.debug("Setting service description for " + endpoint); 118 } 119 String key = getKey(endpoint); 120 this.serviceDescriptions.put(key, doc); 121 } 122 123 private String getKey(ServiceEndpoint endpoint) 124 { 125 StringBuffer sb = new StringBuffer (); 126 sb.append("{"); 127 sb.append(endpoint.getServiceName().getNamespaceURI()); 128 sb.append("}"); 129 sb.append(endpoint.getServiceName().getLocalPart()); 130 sb.append(":"); 131 sb.append(endpoint.getEndpointName()); 132 return sb.toString(); 133 } 134 135 141 public boolean isExchangeWithConsumerOkay(ServiceEndpoint endpoint, MessageExchange exchange) 142 { 143 return true; 144 } 145 146 152 public boolean isExchangeWithProviderOkay(ServiceEndpoint endpoint, MessageExchange exchange) 153 { 154 return true; 155 } 156 157 162 public ServiceEndpoint resolveEndpointReference(DocumentFragment epr) 163 { 164 return null; 165 } 166 167 protected ObjectName createExtensionMBeanName() throws Exception 168 { 169 return this.context.getMBeanNames().createCustomComponentMBeanName("extension"); 170 } 171 172 public Object getExtensionMBean() 173 { 174 return null; } 176 177 public QName getService() 178 { 179 return service; 180 } 181 182 public void setService(QName service) 183 { 184 this.service = service; 185 } 186 187 public String getName() 188 { 189 return name; 190 } 191 192 public void setName(String name) 193 { 194 this.name = name; 195 } 196 197 public WorkManager getWorkManager() 198 { 199 return workManager; 200 } 201 202 public void setWorkManager(WorkManager workManager) 203 { 204 this.workManager = workManager; 205 } 206 207 209 214 public synchronized final void init(ComponentContext context) throws JBIException 215 { 216 try 217 { 218 if (context.getComponentName() != null) 219 { 220 name = context.getComponentName(); 221 } 222 if (name == null) 223 { 224 throw new NullPointerException ("No name has been set for this component"); 225 } 226 227 if (service == null) 228 { 229 service = (QName )new QNameConverter().convert(QName .class, name); 230 } 231 232 context.activateEndpoint(service, service.getLocalPart()); 233 234 if (logger.isDebugEnabled()) 235 { 236 logger.debug("Initializing component: " + name); 237 } 238 this.context = context; 239 deliveryChannel = context.getDeliveryChannel(); 240 exchangeFactory = deliveryChannel.createExchangeFactory(); 241 Object mbean = getExtensionMBean(); 242 if (serviceUnitManager == null) 243 { 244 serviceUnitManager = new DefaultServiceUnitManager(); 245 } 246 247 if (workManager == null) 248 { 249 workManager = MuleManager.getInstance().getWorkManager(); 250 } 251 252 if (mbean != null) 253 { 254 if (mbeanName == null) 255 { 256 this.mbeanName = createExtensionMBeanName(); 257 } 258 MBeanServer server = AbstractJbiComponent.this.context.getMBeanServer(); 259 if (server == null) 260 { 261 throw new JBIException("null mBeanServer"); 262 } 263 264 if (server.isRegistered(this.mbeanName)) 265 { 266 server.unregisterMBean(this.mbeanName); 267 } 268 server.registerMBean(mbean, this.mbeanName); 269 } 270 271 doInit(); 272 if (logger.isDebugEnabled()) 273 { 274 logger.debug("Jbi Receiver Component initialized: " + name); 275 } 276 } 277 catch (JBIException e) 278 { 279 throw e; 280 } 281 catch (Exception e) 282 { 283 throw new JBIException("Error calling init on " + name, e); 284 } 285 } 286 287 public final void shutDown() throws JBIException 288 { 289 try 290 { 291 if (logger.isDebugEnabled()) 292 { 293 logger.debug("Shutting down component: " + getName()); 294 } 295 started.set(false); 296 doShutdown(); 297 if (this.mbeanName != null) 298 { 299 MBeanServer server = context.getMBeanServer(); 300 if (server == null) 301 { 302 throw new JBIException("null mBeanServer"); 303 } 304 if (server.isRegistered(this.mbeanName)) 305 { 306 server.unregisterMBean(this.mbeanName); 307 } 308 } 309 this.context = null; 310 if (logger.isDebugEnabled()) 311 { 312 logger.debug("Component shut down: " + getName()); 313 } 314 } 315 catch (JBIException e) 316 { 317 throw e; 318 } 319 catch (Exception e) 320 { 321 throw new JBIException("Error calling shutdown on " + getName(), e); 322 } 323 } 324 325 public final void start() throws JBIException 326 { 327 logger.debug("Starting Mule Jbi component: " + name); 328 started.set(true); 329 if (this instanceof MessageExchangeListener) 330 { 331 try 332 { 333 logger.debug("Starting ME thread for: " + name); 334 getWorkManager().scheduleWork(this); 335 } 336 catch (WorkException e) 337 { 338 throw new JBIException(e); 339 } 340 } 341 doStart(); 342 343 } 344 345 public final void stop() throws JBIException 346 { 347 started.set(false); 348 doStop(); 349 } 350 351 protected void doInit() throws JBIException 352 { 353 } 355 356 protected void doStart() throws JBIException 357 { 358 } 360 361 protected void doStop() throws JBIException 362 { 363 } 365 366 protected void doShutdown() throws JBIException 367 { 368 } 370 371 public ObjectName getExtensionMBeanName() 372 { 373 return mbeanName; 374 } 375 376 public void setExtensionMBeanName(ObjectName mbeanName) 377 { 378 this.mbeanName = mbeanName; 379 } 380 381 public void release() 382 { 383 } 385 386 389 public void run() 390 { 391 while (started.get()) 392 { 393 try 394 { 395 final MessageExchange me = deliveryChannel.accept(); 396 if (me != null) 397 { 398 getWorkManager().scheduleWork( 399 new MessageExchangeWorker(me, (MessageExchangeListener)this)); 400 } 401 } 402 catch (Exception e) 403 { 404 logger.error(e.getMessage(), e); 405 } 406 } 407 } 408 409 protected class MessageExchangeWorker implements Work 410 { 411 private MessageExchange me; 412 private MessageExchangeListener listener; 413 414 public MessageExchangeWorker(MessageExchange me, MessageExchangeListener listener) 415 { 416 this.me = me; 417 this.listener = listener; 418 } 419 420 public void release() 421 { 422 } 424 425 public void run() 426 { 427 try 428 { 429 try 430 { 431 listener.onExchange(me); 432 done(me); 433 } 434 catch (MessagingException e) 435 { 436 error(me, e); 437 } 438 } 439 catch (Exception e) 440 { 441 handleException(e); 442 } 443 } 444 } 445 446 protected void handleException(Throwable t) 447 { 448 logger.error(t.getMessage(), t); 449 } 450 451 protected void error(MessageExchange me, Fault fault) throws MessagingException 452 { 453 me.setFault(fault); 454 me.setStatus(ExchangeStatus.ERROR); 455 context.getDeliveryChannel().send(me); 456 } 457 458 protected void done(MessageExchange me) throws MessagingException 459 { 460 me.setStatus(ExchangeStatus.DONE); 461 context.getDeliveryChannel().send(me); 462 } 463 464 protected void error(MessageExchange me, Exception e) throws MessagingException 465 { 466 me.setError(e); 467 me.setStatus(ExchangeStatus.ERROR); 468 context.getDeliveryChannel().send(me); 469 } 470 471 private class DefaultServiceUnitManager implements ServiceUnitManager 472 { 473 public String deploy(String string, String string1) throws DeploymentException 474 { 475 return null; 476 } 477 478 public void init(String string, String string1) throws DeploymentException 479 { 480 } 482 483 public void start(String string) throws DeploymentException 484 { 485 } 487 488 public void stop(String string) throws DeploymentException 489 { 490 } 492 493 public void shutDown(String string) throws DeploymentException 494 { 495 } 497 498 public String undeploy(String string, String string1) throws DeploymentException 499 { 500 return null; 501 } 502 } 503 } 504 | Popular Tags |