1 22 package org.jboss.resource.adapter.jms.inflow; 23 24 import java.lang.reflect.Method ; 25 26 import javax.jms.Connection ; 27 import javax.jms.Destination ; 28 import javax.jms.ExceptionListener ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageListener ; 32 import javax.jms.Queue ; 33 import javax.jms.QueueConnection ; 34 import javax.jms.QueueConnectionFactory ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.XAQueueConnectionFactory ; 39 import javax.jms.XATopicConnectionFactory ; 40 import javax.naming.Context ; 41 import javax.resource.ResourceException ; 42 import javax.resource.spi.endpoint.MessageEndpointFactory ; 43 import javax.resource.spi.work.Work ; 44 import javax.resource.spi.work.WorkManager ; 45 import javax.transaction.TransactionManager ; 46 47 import org.jboss.jms.jndi.JMSProviderAdapter; 48 import org.jboss.logging.Logger; 49 import org.jboss.resource.adapter.jms.JmsResourceAdapter; 50 import org.jboss.tm.TransactionManagerLocator; 51 import org.jboss.util.Strings; 52 import org.jboss.util.naming.Util; 53 54 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 55 56 62 public class JmsActivation implements ExceptionListener 63 { 64 65 private static final Logger log = Logger.getLogger(JmsActivation.class); 66 67 68 public static final Method ONMESSAGE; 69 70 71 protected JmsResourceAdapter ra; 72 73 74 protected JmsActivationSpec spec; 75 76 77 protected MessageEndpointFactory endpointFactory; 78 79 80 protected SynchronizedBoolean deliveryActive; 81 82 83 protected JMSProviderAdapter adapter; 84 85 86 protected Destination destination; 87 88 89 protected Connection connection; 90 91 92 protected JmsServerSessionPool pool; 93 94 95 protected boolean isDeliveryTransacted; 96 97 98 protected DLQHandler dlqHandler; 99 100 101 protected TransactionManager tm; 102 103 104 static 105 { 106 try 107 { 108 ONMESSAGE = MessageListener .class.getMethod("onMessage", new Class [] { Message .class }); 109 } 110 catch (Exception e) 111 { 112 throw new RuntimeException (e); 113 } 114 } 115 116 public JmsActivation(JmsResourceAdapter ra, MessageEndpointFactory endpointFactory, JmsActivationSpec spec) throws ResourceException 117 { 118 this.ra = ra; 119 this.endpointFactory = endpointFactory; 120 this.spec = spec; 121 try 122 { 123 this.isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ONMESSAGE); 124 } 125 catch (Exception e) 126 { 127 throw new ResourceException (e); 128 } 129 } 130 131 134 public JmsActivationSpec getActivationSpec() 135 { 136 return spec; 137 } 138 139 142 public MessageEndpointFactory getMessageEndpointFactory() 143 { 144 return endpointFactory; 145 } 146 147 150 public boolean isDeliveryTransacted() 151 { 152 return isDeliveryTransacted; 153 } 154 155 158 public WorkManager getWorkManager() 159 { 160 return ra.getWorkManager(); 161 } 162 163 public TransactionManager getTransactionManager() 164 { 165 if (tm == null) 166 { 167 tm = TransactionManagerLocator.getInstance().locate(); 168 169 } 170 171 return tm; 172 } 173 174 177 public Connection getConnection() 178 { 179 return connection; 180 } 181 182 185 public Destination getDestination() 186 { 187 return destination; 188 } 189 190 193 public JMSProviderAdapter getProviderAdapter() 194 { 195 return adapter; 196 } 197 198 201 public DLQHandler getDLQHandler() 202 { 203 return dlqHandler; 204 } 205 206 211 public void start() throws ResourceException 212 { 213 deliveryActive = new SynchronizedBoolean(true); 214 ra.getWorkManager().scheduleWork(new SetupActivation()); 215 } 216 217 220 public void stop() 221 { 222 deliveryActive.set(false); 223 teardown(); 224 } 225 226 229 public void handleFailure(Throwable failure) 230 { 231 log.warn("Failure in jms activation " + spec, failure); 232 int reconnectCount = 0; 233 234 while (deliveryActive.get() && reconnectCount < spec.getReconnectAttempts()) 235 { 236 teardown(); 237 try 238 { 239 Thread.sleep(spec.getReconnectIntervalLong()); 240 } 241 catch (InterruptedException e) 242 { 243 log.debug("Interrupted trying to reconnect " + spec, e); 244 break; 245 } 246 247 log.info("Attempting to reconnect " + spec); 248 try 249 { 250 setup(); 251 log.info("Reconnected with messaging provider."); 252 break; 253 } 254 catch (Throwable t) 255 { 256 log.error("Unable to reconnect " + spec, t); 257 } 258 259 ++reconnectCount; 260 261 } 262 } 263 264 public void onException(JMSException exception) 265 { 266 handleFailure(exception); 267 } 268 269 public String toString() 270 { 271 StringBuffer buffer = new StringBuffer (); 272 buffer.append(Strings.defaultToString(this)).append('('); 273 buffer.append("spec=").append(Strings.defaultToString(spec)); 274 buffer.append(" mepf=").append(Strings.defaultToString(endpointFactory)); 275 buffer.append(" active=").append(deliveryActive.get()); 276 if (destination != null) 277 buffer.append(" destination=").append(destination); 278 if (connection != null) 279 buffer.append(" connection=").append(connection); 280 if (pool != null) 281 buffer.append(" pool=").append(Strings.defaultToString(pool)); 282 if (dlqHandler != null) 283 buffer.append(" dlq=").append(Strings.defaultToString(dlqHandler)); 284 buffer.append(" transacted=").append(isDeliveryTransacted); 285 buffer.append(')'); 286 return buffer.toString(); 287 } 288 289 294 protected void setup() throws Exception 295 { 296 log.debug("Setting up " + spec); 297 298 setupJMSProviderAdapter(); 299 Context ctx = adapter.getInitialContext(); 300 log.debug("Using context " + ctx.getEnvironment() + " for " + spec); 301 try 302 { 303 setupDLQ(ctx); 304 setupDestination(ctx); 305 setupConnection(ctx); 306 } 307 finally 308 { 309 ctx.close(); 310 } 311 setupSessionPool(); 312 313 log.debug("Setup complete " + this); 314 } 315 316 319 protected void teardown() 320 { 321 log.debug("Tearing down " + spec); 322 323 teardownSessionPool(); 324 teardownConnection(); 325 teardownDestination(); 326 teardownDLQ(); 327 328 log.debug("Tearing down complete " + this); 329 } 330 331 334 protected void setupJMSProviderAdapter() throws Exception 335 { 336 String providerAdapterJNDI = spec.getProviderAdapterJNDI(); 337 if (providerAdapterJNDI.startsWith("java:") == false) 338 providerAdapterJNDI = "java:" + providerAdapterJNDI; 339 340 log.debug("Retrieving the jms provider adapter " + providerAdapterJNDI + " for " + this); 341 adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI, JMSProviderAdapter.class); 342 log.debug("Using jms provider adapter " + adapter + " for " + this); 343 } 344 345 351 protected void setupDLQ(Context ctx) throws Exception 352 { 353 if (spec.isUseDLQ()) 354 { 355 Class clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler()); 356 dlqHandler = (DLQHandler) clazz.newInstance(); 357 dlqHandler.setup(this, ctx); 358 } 359 360 log.debug("Setup DLQ " + this); 361 } 362 363 366 protected void teardownDLQ() 367 { 368 log.debug("Removing DLQ " + this); 369 try 370 { 371 if (dlqHandler != null) 372 dlqHandler.teardown(); 373 } 374 catch (Throwable t) 375 { 376 log.debug("Error tearing down the DLQ " + dlqHandler, t); 377 } 378 dlqHandler = null; 379 } 380 381 387 protected void setupDestination(Context ctx) throws Exception 388 { 389 Class destinationType; 390 if (spec.isTopic()) 391 destinationType = Topic .class; 392 else 393 destinationType = Queue .class; 394 395 String destinationName = spec.getDestination(); 396 log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName()); 397 destination = (Destination ) Util.lookup(ctx, destinationName, destinationType); 398 log.debug("Got destination " + destination + " from " + destinationName); 399 } 400 401 404 protected void teardownDestination() 405 { 406 } 407 408 414 protected void setupConnection(Context ctx) throws Exception 415 { 416 log.debug("setup connection " + this); 417 418 String user = spec.getUser(); 419 String pass = spec.getPassword(); 420 String clientID = spec.getClientId(); 421 if (spec.isTopic()) 422 connection = setupTopicConnection(ctx, user, pass, clientID); 423 else 424 connection = setupQueueConnection(ctx, user, pass, clientID); 425 426 log.debug("established connection " + this); 427 } 428 429 438 protected QueueConnection setupQueueConnection(Context ctx, String user, String pass, String clientID) throws Exception 439 { 440 String queueFactoryRef = adapter.getQueueFactoryRef(); 441 log.debug("Attempting to lookup queue connection factory " + queueFactoryRef); 442 QueueConnectionFactory qcf = (QueueConnectionFactory ) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory .class); 443 log.debug("Got queue connection factory " + qcf + " from " + queueFactoryRef); 444 log.debug("Attempting to create queue connection with user " + user); 445 QueueConnection result; 446 if (qcf instanceof XAQueueConnectionFactory && isDeliveryTransacted) 447 { 448 XAQueueConnectionFactory xaqcf = (XAQueueConnectionFactory ) qcf; 449 if (user != null) 450 result = xaqcf.createXAQueueConnection(user, pass); 451 else 452 result = xaqcf.createXAQueueConnection(); 453 } 454 else 455 { 456 if (user != null) 457 result = qcf.createQueueConnection(user, pass); 458 else 459 result = qcf.createQueueConnection(); 460 } 461 if (clientID != null) 462 result.setClientID(clientID); 463 result.setExceptionListener(this); 464 log.debug("Using queue connection " + result); 465 return result; 466 } 467 468 477 protected TopicConnection setupTopicConnection(Context ctx, String user, String pass, String clientID) throws Exception 478 { 479 String topicFactoryRef = adapter.getTopicFactoryRef(); 480 log.debug("Attempting to lookup topic connection factory " + topicFactoryRef); 481 TopicConnectionFactory tcf = (TopicConnectionFactory ) Util.lookup(ctx, topicFactoryRef, TopicConnectionFactory .class); 482 log.debug("Got topic connection factory " + tcf + " from " + topicFactoryRef); 483 log.debug("Attempting to create topic connection with user " + user); 484 TopicConnection result; 485 if (tcf instanceof XATopicConnectionFactory && isDeliveryTransacted) 486 { 487 XATopicConnectionFactory xatcf = (XATopicConnectionFactory ) tcf; 488 if (user != null) 489 result = xatcf.createXATopicConnection(user, pass); 490 else 491 result = xatcf.createXATopicConnection(); 492 } 493 else 494 { 495 if (user != null) 496 result = tcf.createTopicConnection(user, pass); 497 else 498 result = tcf.createTopicConnection(); 499 } 500 if (clientID != null) 501 result.setClientID(clientID); 502 result.setExceptionListener(this); 503 log.debug("Using topic connection " + result); 504 return result; 505 } 506 507 510 protected void teardownConnection() 511 { 512 try 513 { 514 if (connection != null) 515 { 516 log.debug("Closing the " + connection); 517 connection.close(); 518 } 519 } 520 catch (Throwable t) 521 { 522 log.debug("Error closing the connection " + connection, t); 523 } 524 connection = null; 525 } 526 527 532 protected void setupSessionPool() throws Exception 533 { 534 pool = new JmsServerSessionPool(this); 535 log.debug("Created session pool " + pool); 536 537 log.debug("Starting session pool " + pool); 538 pool.start(); 539 log.debug("Started session pool " + pool); 540 541 log.debug("Starting delivery " + connection); 542 connection.start(); 543 log.debug("Started delivery " + connection); 544 } 545 546 549 protected void teardownSessionPool() 550 { 551 try 552 { 553 if (connection != null) 554 { 555 log.debug("Stopping delivery " + connection); 556 connection.stop(); 557 } 558 } 559 catch (Throwable t) 560 { 561 log.debug("Error stopping delivery " + connection, t); 562 } 563 564 try 565 { 566 if (pool != null) 567 { 568 log.debug("Stopping the session pool " + pool); 569 pool.stop(); 570 } 571 } 572 catch (Throwable t) 573 { 574 log.debug("Error clearing the pool " + pool, t); 575 } 576 } 577 578 581 private class SetupActivation implements Work 582 { 583 public void run() 584 { 585 try 586 { 587 setup(); 588 } 589 catch (Throwable t) 590 { 591 handleFailure(t); 592 } 593 } 594 595 public void release() 596 { 597 } 598 } 599 } 600 | Popular Tags |