1 22 package org.jboss.resource.adapter.jms; 23 24 import java.io.PrintWriter ; 25 import java.util.Collections ; 26 import java.util.HashSet ; 27 import java.util.Iterator ; 28 import java.util.Set ; 29 import java.util.Vector ; 30 31 import javax.jms.Connection ; 32 import javax.jms.ExceptionListener ; 33 import javax.jms.JMSException ; 34 import javax.jms.QueueConnection ; 35 import javax.jms.QueueSession ; 36 import javax.jms.Session ; 37 import javax.jms.TopicConnection ; 38 import javax.jms.TopicSession ; 39 import javax.jms.XAConnection ; 40 import javax.jms.XAQueueConnection ; 41 import javax.jms.XAQueueSession ; 42 import javax.jms.XASession ; 43 import javax.jms.XATopicConnection ; 44 import javax.jms.XATopicSession ; 45 import javax.naming.Context ; 46 import javax.naming.InitialContext ; 47 import javax.naming.NamingException ; 48 import javax.resource.NotSupportedException ; 49 import javax.resource.ResourceException ; 50 import javax.resource.spi.ConnectionEvent ; 51 import javax.resource.spi.ConnectionEventListener ; 52 import javax.resource.spi.ConnectionRequestInfo ; 53 import javax.resource.spi.IllegalStateException ; 54 import javax.resource.spi.LocalTransaction ; 55 import javax.resource.spi.ManagedConnection ; 56 import javax.resource.spi.ManagedConnectionMetaData ; 57 import javax.resource.spi.SecurityException ; 58 import javax.security.auth.Subject ; 59 import javax.transaction.xa.XAResource ; 60 61 import org.jboss.jms.ConnectionFactoryHelper; 62 import org.jboss.jms.jndi.JMSProviderAdapter; 63 import org.jboss.logging.Logger; 64 import org.jboss.resource.JBossResourceException; 65 66 133 public class JmsManagedConnection 134 implements ManagedConnection , ExceptionListener 135 { 136 private static final Logger log = Logger.getLogger(JmsManagedConnection.class); 137 138 private JmsManagedConnectionFactory mcf; 139 private JmsConnectionRequestInfo info; 140 private String user; 141 private String pwd; 142 private boolean isDestroyed; 143 144 private Connection con; 146 private Session session; 147 private TopicSession topicSession; 148 private QueueSession queueSession; 149 private XASession xaSession; 150 private XATopicSession xaTopicSession; 151 private XAQueueSession xaQueueSession; 152 private XAResource xaResource; 153 private boolean xaTransacted; 154 155 156 private Set handles = Collections.synchronizedSet(new HashSet ()); 157 158 159 private Vector listeners = new Vector (); 160 161 171 public JmsManagedConnection(final JmsManagedConnectionFactory mcf, 172 final ConnectionRequestInfo info, 173 final String user, 174 final String pwd) 175 throws ResourceException 176 { 177 this.mcf = mcf; 178 179 this.info = (JmsConnectionRequestInfo)info; 181 this.user = user; 182 this.pwd = pwd; 183 184 setup(); 185 } 186 187 189 207 public Object getConnection(final Subject subject, 208 final ConnectionRequestInfo info) 209 throws ResourceException 210 { 211 JmsCred cred = JmsCred.getJmsCred(mcf,subject,info); 213 214 if (user != null && !user.equals(cred.name)) 216 throw new SecurityException 217 ("Password credentials not the same, reauthentication not allowed"); 218 if (cred.name != null && user == null) { 219 throw new SecurityException 220 ("Password credentials not the same, reauthentication not allowed"); 221 } 222 223 user = cred.name; 225 if (isDestroyed) 226 throw new IllegalStateException ("ManagedConnection already destroyd"); 227 228 JmsSession handle = new JmsSession(this, (JmsConnectionRequestInfo) info); 230 handles.add(handle); 231 return handle; 232 } 233 234 239 private void destroyHandles() throws ResourceException 240 { 241 try 242 { 243 if (con != null) 244 con.stop(); 245 } 246 catch (Throwable t) 247 { 248 log.trace("Ignored error stopping connection", t); 249 } 250 251 Iterator iter = handles.iterator(); 252 while (iter.hasNext()) 253 ((JmsSession)iter.next()).destroy(); 254 255 handles.clear(); 257 } 258 259 265 public void destroy() throws ResourceException 266 { 267 if (isDestroyed) return; 268 269 isDestroyed = true; 270 271 try 272 { 273 con.setExceptionListener(null); 274 } 275 catch (JMSException e) 276 { 277 log.debug("Error unsetting the exception listener " + this, e); 278 } 279 280 destroyHandles(); 282 283 try 284 { 285 try 287 { 288 if (info.getType() == JmsConnectionFactory.TOPIC) 289 { 290 topicSession.close(); 291 if (xaTransacted) { 292 xaTopicSession.close(); 293 } 294 } 295 else if (info.getType() == JmsConnectionFactory.QUEUE) 296 { 297 queueSession.close(); 298 if (xaTransacted) 299 xaQueueSession.close(); 300 } 301 else 302 { 303 session.close(); 304 if (xaTransacted) 305 xaSession.close(); 306 } 307 } 308 catch (JMSException e) 309 { 310 log.debug("Error closing session " +this, e); 311 } 312 con.close(); 313 } 314 catch (JMSException e) 315 { 316 throw new JBossResourceException 317 ("Could not properly close the session and connection", e); 318 } 319 } 320 321 328 public void cleanup() throws ResourceException 329 { 330 if (isDestroyed) 331 throw new IllegalStateException ("ManagedConnection already destroyed"); 332 333 destroyHandles(); 335 } 336 337 345 public void associateConnection(final Object obj) 346 throws ResourceException 347 { 348 352 if (!isDestroyed && obj instanceof JmsSession) 353 { 354 JmsSession h = (JmsSession)obj; 355 h.setManagedConnection(this); 356 handles.add(h); 357 } 358 else 359 throw new IllegalStateException 360 ("ManagedConnection in an illegal state"); 361 } 362 363 368 public void addConnectionEventListener(final ConnectionEventListener l) 369 { 370 listeners.addElement(l); 371 372 if (log.isTraceEnabled()) 373 log.trace("ConnectionEvent listener added: " + l); 374 } 375 376 381 public void removeConnectionEventListener(final ConnectionEventListener l) 382 { 383 listeners.removeElement(l); 384 } 385 386 393 public XAResource getXAResource() throws ResourceException 394 { 395 if (!xaTransacted) 400 throw new NotSupportedException ("Non XA transaction not supported"); 401 402 if (xaResource == null) 403 { 404 if (info.getType() == JmsConnectionFactory.TOPIC) 405 xaResource = xaTopicSession.getXAResource(); 406 else if (info.getType() == JmsConnectionFactory.QUEUE) 407 xaResource = xaQueueSession.getXAResource(); 408 else 409 xaResource = xaSession.getXAResource(); 410 } 411 412 if (log.isTraceEnabled()) 413 log.trace("XAResource=" + xaResource); 414 415 return xaResource; 416 } 417 418 425 public LocalTransaction getLocalTransaction() throws ResourceException 426 { 427 LocalTransaction tx = new JmsLocalTransaction(this); 428 if (log.isTraceEnabled()) 429 log.trace("LocalTransaction=" + tx); 430 return tx; 431 } 432 433 441 public ManagedConnectionMetaData getMetaData() throws ResourceException 442 { 443 if (isDestroyed) 444 throw new IllegalStateException ("ManagedConnection already destroyd"); 445 446 return new JmsMetaData(this); 447 } 448 449 456 public void setLogWriter(final PrintWriter out) throws ResourceException 457 { 458 } 462 463 468 public PrintWriter getLogWriter() throws ResourceException 469 { 470 474 return null; 475 } 476 477 479 public void onException(JMSException exception) 480 { 481 if (isDestroyed) 482 { 483 if (log.isTraceEnabled()) 484 log.trace("Ignoring error on already destroyed connection " + this, exception); 485 return; 486 } 487 488 log.warn("Handling jms exception failure: " + this, exception); 489 490 try 491 { 492 con.setExceptionListener(null); 493 } 494 catch (JMSException e) 495 { 496 log.debug("Unable to unset exception listener", e); 497 } 498 499 ConnectionEvent event = new ConnectionEvent (this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception); 500 sendEvent(event); 501 } 502 503 505 510 protected Session getSession() 511 { 512 if (info.getType() == JmsConnectionFactory.TOPIC) 513 return topicSession; 514 else if (info.getType() == JmsConnectionFactory.QUEUE) 515 return queueSession; 516 else 517 return session; 518 } 519 520 525 protected void sendEvent(final ConnectionEvent event) 526 { 527 int type = event.getId(); 528 529 if (log.isTraceEnabled()) 530 log.trace("Sending connection event: " + type); 531 532 ConnectionEventListener [] list = 534 (ConnectionEventListener [])listeners.toArray(new ConnectionEventListener [listeners.size()]); 535 536 for (int i=0; i<list.length; i++) 537 { 538 switch (type) { 539 case ConnectionEvent.CONNECTION_CLOSED: 540 list[i].connectionClosed(event); 541 break; 542 543 case ConnectionEvent.LOCAL_TRANSACTION_STARTED: 544 list[i].localTransactionStarted(event); 545 break; 546 547 case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED: 548 list[i].localTransactionCommitted(event); 549 break; 550 551 case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK: 552 list[i].localTransactionRolledback(event); 553 break; 554 555 case ConnectionEvent.CONNECTION_ERROR_OCCURRED: 556 list[i].connectionErrorOccurred(event); 557 break; 558 559 default: 560 throw new IllegalArgumentException ("Illegal eventType: " + type); 561 } 562 } 563 } 564 565 570 protected void removeHandle(final JmsSession handle) 571 { 572 handles.remove(handle); 573 } 574 575 577 582 protected ConnectionRequestInfo getInfo() 583 { 584 return info; 585 } 586 587 592 protected JmsManagedConnectionFactory getManagedConnectionFactory() 593 { 594 return mcf; 595 } 596 597 void start() throws JMSException 598 { 599 con.start(); 600 } 601 602 void stop() throws JMSException 603 { 604 con.stop(); 605 } 606 607 609 614 protected String getUserName() 615 { 616 return user; 617 } 618 619 621 629 private JMSProviderAdapter getProviderAdapter() throws NamingException 630 { 631 JMSProviderAdapter adapter; 632 633 if (mcf.getJmsProviderAdapterJNDI() != null) 634 { 635 Context ctx = new InitialContext (); 637 try 638 { 639 adapter = (JMSProviderAdapter) 640 ctx.lookup(mcf.getJmsProviderAdapterJNDI()); 641 } 642 finally 643 { 644 ctx.close(); 645 } 646 } 647 else 648 adapter = mcf.getJmsProviderAdapter(); 649 650 return adapter; 651 } 652 653 658 private void setup() throws ResourceException 659 { 660 boolean trace = log.isTraceEnabled(); 661 662 try 663 { 664 JMSProviderAdapter adapter = getProviderAdapter(); 665 Context context = adapter.getInitialContext(); 666 Object factory; 667 boolean transacted = info.isTransacted(); 668 int ack = Session.AUTO_ACKNOWLEDGE; 669 670 if (info.getType() == JmsConnectionFactory.TOPIC) 671 { 672 String jndi = adapter.getTopicFactoryRef(); 673 if (jndi == null) 674 throw new IllegalStateException ("No configured 'TopicFactoryRef' on the jms provider " + mcf.getJmsProviderAdapterJNDI()); 675 factory = context.lookup(jndi); 676 con = ConnectionFactoryHelper.createTopicConnection(factory, user, pwd); 677 if (info.getClientID() != null) 678 con.setClientID(info.getClientID()); 679 con.setExceptionListener(this); 680 if (trace) 681 log.trace("created connection: " + con); 682 683 if (con instanceof XATopicConnection ) 684 { 685 xaTopicSession = ((XATopicConnection )con).createXATopicSession(); 686 topicSession = xaTopicSession.getTopicSession(); 687 xaTransacted = true; 688 } 689 else if (con instanceof TopicConnection ) 690 { 691 topicSession = 692 ((TopicConnection )con).createTopicSession(transacted, ack); 693 if (trace) 694 log.trace("Using a non-XA TopicConnection. " + 695 "It will not be able to participate in a Global UOW"); 696 } 697 else 698 throw new JBossResourceException("Connection was not recognizable: " + con); 699 700 if (trace) 701 log.trace("xaTopicSession=" + xaTopicSession + ", topicSession=" + topicSession); 702 } 703 else if (info.getType() == JmsConnectionFactory.QUEUE) 704 { 705 String jndi = adapter.getQueueFactoryRef(); 706 if (jndi == null) 707 throw new IllegalStateException ("No configured 'QueueFactoryRef' on the jms provider " + mcf.getJmsProviderAdapterJNDI()); 708 factory = context.lookup(jndi); 709 con = ConnectionFactoryHelper.createQueueConnection(factory, user, pwd); 710 if (info.getClientID() != null) 711 con.setClientID(info.getClientID()); 712 con.setExceptionListener(this); 713 if (trace) 714 log.debug("created connection: " + con); 715 716 if (con instanceof XAQueueConnection ) 717 { 718 xaQueueSession = 719 ((XAQueueConnection )con).createXAQueueSession(); 720 queueSession = xaQueueSession.getQueueSession(); 721 xaTransacted = true; 722 } 723 else if (con instanceof QueueConnection ) 724 { 725 queueSession = 726 ((QueueConnection )con).createQueueSession(transacted, ack); 727 if (trace) 728 log.trace("Using a non-XA QueueConnection. " + 729 "It will not be able to participate in a Global UOW"); 730 } 731 else 732 throw new JBossResourceException("Connection was not reconizable: " + con); 733 734 if (trace) 735 log.trace("xaQueueSession=" + xaQueueSession + ", queueSession=" + queueSession); 736 } 737 else 738 { 739 String jndi = adapter.getFactoryRef(); 740 if (jndi == null) 741 throw new IllegalStateException ("No configured 'FactoryRef' on the jms provider " + mcf.getJmsProviderAdapterJNDI()); 742 factory = context.lookup(jndi); 743 con = ConnectionFactoryHelper.createConnection(factory, user, pwd); 744 if (info.getClientID() != null) 745 con.setClientID(info.getClientID()); 746 con.setExceptionListener(this); 747 if (trace) 748 log.trace("created connection: " + con); 749 750 if (con instanceof XAConnection ) 751 { 752 xaSession = 753 ((XAConnection )con).createXASession(); 754 session = xaSession.getSession(); 755 xaTransacted = true; 756 } 757 else 758 { 759 session = con.createSession(transacted, ack); 760 if (trace) 761 log.trace("Using a non-XA Connection. " + 762 "It will not be able to participate in a Global UOW"); 763 } 764 765 if (trace) 766 log.debug("xaSession=" + xaQueueSession + ", Session=" + session); 767 } 768 769 if (trace) 770 log.debug("transacted=" + transacted + ", ack=" + ack); 771 } 772 catch (NamingException e) 773 { 774 throw new JBossResourceException("Unable to setup connection", e); 775 } 776 catch (JMSException e) 777 { 778 throw new JBossResourceException("Unable to setup connection", e); 779 } 780 } 781 } 782 | Popular Tags |