1 17 package org.apache.servicemix.common; 18 19 import java.lang.reflect.Method ; 20 import java.util.Map ; 21 22 import javax.jbi.JBIException; 23 import javax.jbi.component.ComponentContext; 24 import javax.jbi.component.ComponentLifeCycle; 25 import javax.jbi.messaging.DeliveryChannel; 26 import javax.jbi.messaging.ExchangeStatus; 27 import javax.jbi.messaging.MessageExchange; 28 import javax.jbi.messaging.MessagingException; 29 import javax.jbi.messaging.MessageExchange.Role; 30 import javax.jbi.servicedesc.ServiceEndpoint; 31 import javax.management.MBeanServer ; 32 import javax.management.ObjectName ; 33 import javax.resource.spi.work.Work ; 34 import javax.resource.spi.work.WorkManager ; 35 import javax.transaction.Status ; 36 import javax.transaction.Transaction ; 37 import javax.transaction.TransactionManager ; 38 import javax.xml.namespace.QName ; 39 40 import org.apache.commons.logging.Log; 41 import org.apache.servicemix.JbiConstants; 42 43 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 44 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 45 46 54 public class AsyncBaseLifeCycle implements ComponentLifeCycle { 55 56 protected final transient Log logger; 57 58 protected BaseComponent component; 59 protected ComponentContext context; 60 protected ObjectName mbeanName; 61 protected WorkManager workManager; 62 protected AtomicBoolean running; 63 protected DeliveryChannel channel; 64 protected Thread poller; 65 protected AtomicBoolean polling; 66 protected TransactionManager transactionManager; 67 protected boolean workManagerCreated; 68 protected Map processors = new ConcurrentHashMap(); 69 70 71 public AsyncBaseLifeCycle(BaseComponent component) { 72 this.component = component; 73 this.logger = component.logger; 74 this.running = new AtomicBoolean(false); 75 this.polling = new AtomicBoolean(false); 76 this.processors = new ConcurrentHashMap(); 77 } 78 79 82 public ObjectName getExtensionMBeanName() { 83 return mbeanName; 84 } 85 86 protected Object getExtensionMBean() throws Exception { 87 return null; 88 } 89 90 protected ObjectName createExtensionMBeanName() throws Exception { 91 return this.context.getMBeanNames().createCustomComponentMBeanName("Configuration"); 92 } 93 94 protected QName getEPRServiceName() { 95 return null; 96 } 97 98 101 public void init(ComponentContext context) throws JBIException { 102 try { 103 if (logger.isDebugEnabled()) { 104 logger.debug("Initializing component"); 105 } 106 this.context = context; 107 this.channel = context.getDeliveryChannel(); 108 try { 109 this.transactionManager = (TransactionManager ) context.getTransactionManager(); 110 } catch (Throwable e) { 111 } 115 doInit(); 116 if (logger.isDebugEnabled()) { 117 logger.debug("Component initialized"); 118 } 119 } catch (JBIException e) { 120 throw e; 121 } catch (Exception e) { 122 throw new JBIException("Error calling init", e); 123 } 124 } 125 126 protected void doInit() throws Exception { 127 Object mbean = getExtensionMBean(); 129 if (mbean != null) { 130 MBeanServer server = this.context.getMBeanServer(); 131 if (server == null) { 132 } else { 135 this.mbeanName = createExtensionMBeanName(); 136 if (server.isRegistered(this.mbeanName)) { 137 server.unregisterMBean(this.mbeanName); 138 } 139 server.registerMBean(mbean, this.mbeanName); 140 } 141 } 142 if (this.workManager == null) { 151 this.workManagerCreated = true; 152 this.workManager = createWorkManager(); 153 } 154 } 155 156 159 public void shutDown() throws JBIException { 160 try { 161 if (logger.isDebugEnabled()) { 162 logger.debug("Shutting down component"); 163 } 164 doShutDown(); 165 this.context = null; 166 if (logger.isDebugEnabled()) { 167 logger.debug("Component shut down"); 168 } 169 } catch (JBIException e) { 170 throw e; 171 } catch (Exception e) { 172 throw new JBIException("Error calling shutdown", e); 173 } 174 } 175 176 protected void doShutDown() throws Exception { 177 if (this.mbeanName != null) { 179 MBeanServer server = this.context.getMBeanServer(); 180 if (server == null) { 181 throw new JBIException("null mBeanServer"); 182 } 183 if (server.isRegistered(this.mbeanName)) { 184 server.unregisterMBean(this.mbeanName); 185 } 186 } 187 if (this.workManagerCreated) { 189 if (this.workManager instanceof BasicWorkManager) { 190 ((BasicWorkManager) this.workManager).shutDown(); 191 } 192 this.workManager = null; 193 } 194 } 195 196 199 public void start() throws JBIException { 200 try { 201 if (logger.isDebugEnabled()) { 202 logger.debug("Starting component"); 203 } 204 if (this.running.compareAndSet(false, true)) { 205 doStart(); 206 } 207 if (logger.isDebugEnabled()) { 208 logger.debug("Component started"); 209 } 210 } catch (JBIException e) { 211 throw e; 212 } catch (Exception e) { 213 throw new JBIException("Error calling start", e); 214 } 215 } 216 217 protected void doStart() throws Exception { 218 synchronized (this.polling) { 219 workManager.startWork(new Work () { 220 public void release() { } 221 public void run() { 222 poller = Thread.currentThread(); 223 pollDeliveryChannel(); 224 } 225 }); 226 polling.wait(); 227 } 228 } 229 230 protected void pollDeliveryChannel() { 231 synchronized (polling) { 232 polling.set(true); 233 polling.notify(); 234 } 235 while (running.get()) { 236 try { 237 final MessageExchange exchange = channel.accept(1000L); 238 if (exchange != null) { 239 final Transaction tx = (Transaction ) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME); 240 if (tx != null) { 241 if (transactionManager == null) { 242 throw new IllegalStateException ("Exchange is enlisted in a transaction, but no transaction manager is available"); 243 } 244 transactionManager.suspend(); 245 } 246 workManager.scheduleWork(new Work () { 247 public void release() { 248 } 249 public void run() { 250 processExchangeInTx(exchange, tx); 251 } 252 }); 253 } 254 } catch (Throwable t) { 255 if (running.get() == false) { 256 if (logger.isDebugEnabled()) { 258 logger.debug("Polling thread will stop"); 259 } 260 } else { 261 logger.error("Error polling delivery channel", t); 262 } 263 } 264 } 265 synchronized (polling) { 266 polling.set(false); 267 polling.notify(); 268 } 269 } 270 271 274 public void stop() throws JBIException { 275 try { 276 if (logger.isDebugEnabled()) { 277 logger.debug("Stopping component"); 278 } 279 if (this.running.compareAndSet(true, false)) { 280 doStop(); 281 } 282 if (logger.isDebugEnabled()) { 283 logger.debug("Component stopped"); 284 } 285 } catch (JBIException e) { 286 throw e; 287 } catch (Exception e) { 288 throw new JBIException("Error calling stop", e); 289 } 290 } 291 292 protected void doStop() throws Exception { 293 try { 295 synchronized (polling) { 296 if (polling.get()) { 297 poller.interrupt(); 298 polling.wait(); 299 } 300 } 301 } finally { 302 poller = null; 303 } 304 } 305 306 309 public ComponentContext getContext() { 310 return context; 311 } 312 313 public WorkManager getWorkManager() { 314 return workManager; 315 } 316 317 protected WorkManager createWorkManager() { 318 return new BasicWorkManager(); 320 } 321 322 protected WorkManager findWorkManager() { 323 try { 325 Method getContainerMth = context.getClass().getMethod("getContainer", new Class [0]); 326 Object container = getContainerMth.invoke(context, new Object [0]); 327 Method getWorkManagerMth = container.getClass().getMethod("getWorkManager", new Class [0]); 328 return (WorkManager ) getWorkManagerMth.invoke(container, new Object [0]); 329 } catch (Throwable t) { 330 if (logger.isDebugEnabled()) { 331 logger.debug("JBI container is not ServiceMix. Will create our own WorkManager", t); 332 } 333 } 334 return null; 336 } 337 338 protected void processExchangeInTx(MessageExchange exchange, Transaction tx) { 339 try { 340 if (tx != null) { 341 transactionManager.resume(tx); 342 } 343 processExchange(exchange); 344 } catch (Exception e) { 345 logger.error("Error processing exchange " + exchange, e); 346 try { 347 if (transactionManager != null && 350 transactionManager.getStatus() == Status.STATUS_ACTIVE && 351 exceptionShouldRollbackTx(e)) { 352 transactionManager.setRollbackOnly(); 353 } 354 exchange.setError(e); 355 channel.send(exchange); 356 } catch (Exception inner) { 357 logger.error("Error setting exchange status to ERROR", inner); 358 } 359 } finally { 360 try { 361 if (tx != null) { 363 int status = transactionManager.getStatus(); 364 if (status != Status.STATUS_NO_TRANSACTION) { 368 logger.error("Transaction is still active after exchange processing. Trying to rollback transaction."); 369 try { 370 transactionManager.rollback(); 371 } catch (Throwable t) { 372 logger.error("Error trying to rollback transaction.", t); 373 } 374 } 375 } 376 } catch (Throwable t) { 377 logger.error("Error checking transaction status.", t); 378 } 379 } 380 } 381 382 protected boolean exceptionShouldRollbackTx(Exception e) { 383 return false; 384 } 385 386 public void processExchange(MessageExchange exchange) throws Exception { 387 if (logger.isDebugEnabled()) { 388 logger.debug("Received exchange: status: " + exchange.getStatus() + ", role: " + 389 (exchange.getRole() == Role.CONSUMER ? "consumer" : "provider")); 390 } 391 if (exchange.getRole() == Role.PROVIDER) { 392 boolean dynamic = false; 393 ServiceEndpoint endpoint = exchange.getEndpoint(); 394 String key = EndpointSupport.getKey(exchange.getEndpoint()); 395 Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key); 396 if (ep == null) { 397 if (endpoint.getServiceName().equals(getEPRServiceName())) { 398 ep = getResolvedEPR(exchange.getEndpoint()); 399 dynamic = true; 400 } 401 if (ep == null) { 402 throw new IllegalStateException ("Endpoint not found: " + key); 403 } 404 } 405 ExchangeProcessor processor = ep.getProcessor(); 406 if (processor == null) { 407 throw new IllegalStateException ("No processor found for endpoint: " + key); 408 } 409 try { 410 processor.process(exchange); 411 } finally { 412 if (dynamic) { 414 ep.deactivate(); 415 } 416 } 417 } else { 418 ExchangeProcessor processor = null; 419 if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) { 420 String key = exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString(); 421 Endpoint ep = (Endpoint) this.component.getRegistry().getEndpoint(key); 422 if (ep != null) { 423 processor = ep.getProcessor(); 424 } 425 } else { 426 processor = (ExchangeProcessor) processors.remove(exchange.getExchangeId()); 427 } 428 if (processor == null) { 429 throw new IllegalStateException ("No processor found for: " + exchange.getExchangeId()); 430 } 431 processor.process(exchange); 432 } 433 } 434 435 442 public void sendConsumerExchange(MessageExchange exchange, ExchangeProcessor processor) throws MessagingException { 443 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 445 processors.put(exchange.getExchangeId(), processor); 446 } 447 channel.send(exchange); 448 } 449 450 457 public void sendConsumerExchange(MessageExchange exchange, Endpoint endpoint) throws MessagingException { 458 String key = EndpointSupport.getKey(endpoint); 459 exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key); 460 channel.send(exchange); 461 } 462 463 469 protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception { 470 throw new UnsupportedOperationException ("Component does not handle EPR exchanges"); 471 } 472 473 } 474 | Popular Tags |