1 25 package org.objectweb.joram.client.connector; 26 27 import javax.jms.JMSException ; 28 import javax.jms.Session ; 29 import javax.jms.XAConnection ; 30 import javax.jms.XAQueueConnection ; 31 import javax.jms.XASession ; 32 import javax.jms.XATopicConnection ; 33 import javax.resource.ResourceException ; 34 import javax.resource.spi.CommException ; 35 import javax.resource.spi.ConnectionEvent ; 36 import javax.resource.spi.ConnectionEventListener ; 37 import javax.resource.spi.ConnectionRequestInfo ; 38 import javax.resource.spi.IllegalStateException ; 39 import javax.resource.spi.LocalTransactionException ; 40 import javax.resource.spi.ManagedConnectionMetaData ; 41 import javax.resource.spi.ResourceAdapterInternalException ; 42 import javax.resource.spi.SecurityException ; 43 import javax.transaction.xa.XAResource ; 44 45 import java.io.PrintWriter ; 46 import java.util.Vector ; 47 48 import org.objectweb.util.monolog.api.BasicLevel; 49 50 55 public class ManagedConnectionImpl 56 implements javax.resource.spi.ManagedConnection , 57 javax.resource.spi.LocalTransaction , 58 javax.jms.ExceptionListener 59 { 60 61 private JoramAdapter ra; 62 63 64 private XAConnection cnx = null; 65 66 private Vector handles; 67 68 private Vector listeners; 69 70 private boolean startedLocalTx = false; 71 72 private ManagedConnectionMetaDataImpl metaData = null; 73 74 private PrintWriter out = null; 75 76 77 private boolean valid = false; 78 79 80 private int hashCode = -1; 81 82 83 String hostName; 84 85 int serverPort; 86 87 String mode; 88 89 String userName; 90 94 Session session = null; 95 96 106 ManagedConnectionImpl(JoramAdapter ra, 107 XAConnection cnx, 108 String hostName, 109 int serverPort, 110 String userName) { 111 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 112 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, "ManagedConnectionImpl(" + ra + 113 ", " + cnx + 114 ", " + hostName + 115 ", " + serverPort + 116 ", " + userName + ")"); 117 118 this.ra = ra; 119 this.cnx = cnx; 120 this.hostName = hostName; 121 this.serverPort = serverPort; 122 this.userName = userName; 123 124 if (cnx instanceof XAQueueConnection ) 125 mode = "PTP"; 126 else if (cnx instanceof XATopicConnection ) 127 mode = "PubSub"; 128 else 129 mode = "Unified"; 130 131 try { 132 cnx.setExceptionListener(this); 133 } 134 catch (JMSException exc) {} 135 136 handles = new Vector (); 137 listeners = new Vector (); 138 139 valid = true; 140 141 hashCode = -1; 142 143 ra.addProducer(this); 144 } 145 146 147 153 public Object getConnection(javax.security.auth.Subject subject, 154 ConnectionRequestInfo cxRequestInfo) 155 throws ResourceException { 156 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 157 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 158 this + " getConnection(" + subject + 159 ", " + cxRequestInfo + ")"); 160 161 if (! isValid()) { 162 if (out != null) 163 out.print("Physical connection to the underlying JORAM server has been lost."); 164 throw new CommException ("Physical connection to the underlying " 165 + "JORAM server has been lost."); 166 } 167 168 OutboundConnection handle; 169 170 if (cnx instanceof XAQueueConnection ) 171 handle = new OutboundQueueConnection(this, (XAQueueConnection ) cnx); 172 else if (cnx instanceof XATopicConnection ) 173 handle = new OutboundTopicConnection(this, (XATopicConnection ) cnx); 174 else 175 handle = new OutboundConnection(this, cnx); 176 177 handles.add(handle); 178 179 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 180 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 181 this + " getConnection handles = " + handles); 182 return handle; 183 } 184 185 192 public void associateConnection(Object connection) 193 throws ResourceException { 194 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 195 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 196 this + " associateConnection(" + connection + ")"); 197 198 if (! isValid()) { 199 if (out != null) 200 out.print("Physical connection to the underlying JORAM server has been lost."); 201 throw new CommException ("Physical connection to the underlying " 202 + "JORAM server has been lost."); 203 } 204 205 if (! (connection instanceof OutboundConnection)) { 206 if (out != null) 207 out.print("The provided connection handle is not a JORAM handle."); 208 throw new ResourceException ("The provided connection handle is not " 209 + "a JORAM handle."); 210 } 211 212 OutboundConnection newConn = (OutboundConnection) connection; 213 newConn.managedCx = this; 214 newConn.xac = (org.objectweb.joram.client.jms.XAConnection) cnx; 215 } 216 217 218 public void addConnectionEventListener(ConnectionEventListener listener) 219 { 220 listeners.add(listener); 221 } 222 223 224 public void removeConnectionEventListener(ConnectionEventListener listener) 225 { 226 listeners.remove(listener); 227 } 228 229 241 public XAResource getXAResource() throws ResourceException { 242 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 243 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 244 this + " getXAResource()"); 245 246 if (! isValid()) { 247 if (out != null) 248 out.print("Physical connection to the underlying JORAM server has been lost."); 249 throw new CommException ("Physical connection to the underlying " 250 + "JORAM server has been lost."); 251 } 252 253 try { 254 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 255 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 256 this + " getXAResource session = " + session); 257 258 if (session == null) { 259 OutboundConnection outboundCnx = null; 260 for (java.util.Enumeration e = handles.elements(); e.hasMoreElements(); ) { 261 outboundCnx = (OutboundConnection) e.nextElement(); 262 if (outboundCnx.cnxEquals(cnx)) break; 263 } 264 265 if (outboundCnx == null) 266 outboundCnx = (OutboundConnection) getConnection(null,null); 267 268 if (outboundCnx != null) { 269 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 270 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 271 this + " getXAResource outboundCnx = " + outboundCnx + 272 "\n outboundCnx.sess = " + outboundCnx.sessions); 273 274 OutboundSession outboundSession = null; 275 if (outboundCnx.sessions.size() > 0) { 276 outboundSession = (OutboundSession) outboundCnx.sessions.get(0); 277 278 if (!(outboundSession.sess instanceof XASession )) { 279 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 280 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 281 this + " getXAResource outboundSession.sess = " + outboundSession.sess); 282 283 org.objectweb.joram.client.jms.XAResourceMngr xaResourceMngr = null; 285 if (cnx instanceof org.objectweb.joram.client.jms.XAConnection) { 286 xaResourceMngr = ((org.objectweb.joram.client.jms.XAConnection) cnx).getXAResourceMngr(); 287 } else if (cnx instanceof org.objectweb.joram.client.jms.XAQueueConnection) { 288 xaResourceMngr = ((org.objectweb.joram.client.jms.XAQueueConnection) cnx).getXAResourceMngr(); 289 } else if (cnx instanceof org.objectweb.joram.client.jms.XATopicConnection) { 290 xaResourceMngr = ((org.objectweb.joram.client.jms.XATopicConnection) cnx).getXAResourceMngr(); 291 } 292 293 if (xaResourceMngr == null) 294 xaResourceMngr = new org.objectweb.joram.client.jms.XAResourceMngr( 295 (org.objectweb.joram.client.jms.Connection) outboundCnx.xac); 296 297 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 298 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 299 this + " getXAResource xaResourceMngr = " + xaResourceMngr); 300 301 org.objectweb.joram.client.jms.Session sess = 302 (org.objectweb.joram.client.jms.Session) outboundSession.sess; 303 sess.setTransacted(true); 305 306 session = (Session ) new org.objectweb.joram.client.jms.XASession( 307 (org.objectweb.joram.client.jms.Connection) outboundCnx.xac, 308 sess, 309 xaResourceMngr); 310 311 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 312 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 313 this + " getXAResource session = " + session); 314 } 315 } else { 316 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 317 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 318 this + " getXAResource createXASession"); 319 session = cnx.createXASession(); 320 } 321 } 322 } else if (session instanceof org.objectweb.joram.client.jms.XASession) { 323 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 324 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 325 this + " getXAResource session is XASession and not null"); 326 ((org.objectweb.joram.client.jms.XASession)session).getDelegateSession().setTransacted(true); 328 329 } else if (! (session instanceof javax.jms.XASession )) { 332 if (out != null) 333 out.print("Managed connection not involved in a local transaction."); 334 throw new IllegalStateException ("Managed connection not involved " 335 + "in a local transaction."); 336 } 337 338 339 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 340 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 341 this + " getXAResource return = " + 342 ((XASession ) session).getXAResource()); 343 344 return ((XASession ) session).getXAResource(); 345 } 346 catch (JMSException exc) { 347 if (out != null) 348 out.print("Could not get XA resource: " + exc); 349 throw new ResourceAdapterInternalException ("Could not get XA resource: " 350 + exc); 351 } 352 } 353 354 365 public javax.resource.spi.LocalTransaction getLocalTransaction() 366 throws ResourceException { 367 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 368 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 369 this + " getLocalTransaction()"); 370 371 if (! isValid()) { 372 if (out != null) 373 out.print("Physical connection to the underlying JORAM server has been lost."); 374 throw new CommException ("Physical connection to the underlying " 375 + "JORAM server has been lost."); 376 } 377 try { 378 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 379 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, 380 this + " getLocalTransaction session = " + session); 381 if (session == null) 382 session = cnx.createSession(true, 0); 383 else if (session instanceof javax.jms.XASession ) { 384 if (out != null) 385 out.print("Managed connection already involved in a distributed transaction."); 386 throw new IllegalStateException ("Managed connection already involved " 387 + "in a distributed transaction."); 388 } 389 390 return this; 391 } 392 catch (JMSException exc) { 393 if (out != null) 394 out.print("Could not build underlying transacted JMS session: " + exc); 395 throw new LocalTransactionException ("Could not build underlying " 396 + "transacted JMS session: " + exc); 397 } 398 } 399 400 405 public ManagedConnectionMetaData getMetaData() throws ResourceException 406 { 407 if (metaData == null) 408 metaData = new ManagedConnectionMetaDataImpl(userName); 409 410 return metaData; 411 } 412 413 419 public void setLogWriter(PrintWriter out) throws ResourceException 420 { 421 this.out = out; 422 } 423 424 430 public PrintWriter getLogWriter() throws ResourceException 431 { 432 return out; 433 } 434 435 441 public synchronized void cleanup() 442 throws ResourceException { 443 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 444 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " cleanup()"); 445 446 OutboundConnection handle; 447 while (! handles.isEmpty()) { 448 handle = (OutboundConnection) handles.remove(0); 449 handle.cleanup(); 450 } 451 session = null; 452 } 453 454 459 public synchronized void destroy() 460 throws ResourceException { 461 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 462 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " destroy()"); 463 464 cleanup(); 465 466 try { 467 cnx.close(); 468 } 469 catch (Exception exc) {} 470 471 ra.removeProducer(this); 472 473 valid = false; 474 } 475 476 480 public int hashCode() { 481 if (hashCode == -1) 482 hashCode = (mode + ":" + hostName + ":" + ":" + serverPort + "-" + userName).hashCode(); 483 return hashCode; 484 } 485 486 490 public boolean equals(Object o) { 491 if (! (o instanceof ManagedConnectionImpl)) 492 return false; 493 494 ManagedConnectionImpl other = (ManagedConnectionImpl) o; 495 496 boolean res = 497 mode.equals(other.mode) 498 && hostName.equals(other.hostName) 499 && serverPort == other.serverPort 500 && userName.equals(other.userName) 501 && cnx.equals(other.cnx); 502 503 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 504 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " equals = " + res); 505 506 return res; 507 } 508 509 510 public synchronized void onException(JMSException exc) { 511 512 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 513 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " onException(" + exc + ")"); 514 515 if (! isValid()) 517 return; 518 519 if (! (exc instanceof javax.jms.IllegalStateException )) 522 return; 523 524 ConnectionEvent event = 525 new ConnectionEvent (this, ConnectionEvent.CONNECTION_ERROR_OCCURRED); 526 527 ConnectionEventListener listener; 528 for (int i = 0; i < listeners.size(); i++) { 529 listener = (ConnectionEventListener ) listeners.get(i); 530 listener.connectionErrorOccurred(event); 531 } 532 533 valid = false; 534 } 535 536 537 546 public synchronized void begin() throws ResourceException { 547 548 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 549 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " begin()"); 550 551 if (! isValid()) 552 throw new CommException ("Physical connection to the underlying " 553 + "JORAM server has been lost."); 554 555 if (startedLocalTx) 556 throw new LocalTransactionException ("Local transaction has " 557 + "already begun."); 558 559 ConnectionEvent event = 560 new ConnectionEvent (this, ConnectionEvent.LOCAL_TRANSACTION_STARTED); 561 562 ConnectionEventListener listener; 563 for (int i = 0; i < listeners.size(); i++) { 564 listener = (ConnectionEventListener ) listeners.get(i); 565 listener.localTransactionStarted(event); 566 } 567 568 startedLocalTx = true; 569 } 570 571 579 public synchronized void commit() throws ResourceException { 580 581 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 582 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " commit()"); 583 584 if (! isValid()) 585 throw new CommException ("Physical connection to the underlying " 586 + "JORAM server has been lost."); 587 588 if (! startedLocalTx) 589 throw new LocalTransactionException ("Local transaction has not begun."); 590 591 try { 592 session.commit(); 593 } 594 catch (JMSException exc) { 595 throw new LocalTransactionException ("Commit of the transacted JMS " 596 + "session failed: " + exc); 597 } 598 599 ConnectionEvent event = 600 new ConnectionEvent (this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); 601 602 ConnectionEventListener listener; 603 for (int i = 0; i < listeners.size(); i++) { 604 listener = (ConnectionEventListener ) listeners.get(i); 605 listener.localTransactionCommitted(event); 606 } 607 608 startedLocalTx = false; 609 } 610 611 619 public synchronized void rollback() throws ResourceException { 620 621 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 622 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " rollback()"); 623 624 if (! isValid()) 625 throw new CommException ("Physical connection to the underlying " 626 + "JORAM server has been lost."); 627 628 if (! startedLocalTx) 629 throw new LocalTransactionException ("Local transaction has not begun."); 630 631 try { 632 session.rollback(); 633 } 634 catch (JMSException exc) { 635 throw new LocalTransactionException ("Rollback of the transacted JMS " 636 + "session failed: " + exc); 637 } 638 639 ConnectionEvent event = 640 new ConnectionEvent (this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); 641 642 ConnectionEventListener listener; 643 for (int i = 0; i < listeners.size(); i++) { 644 listener = (ConnectionEventListener ) listeners.get(i); 645 listener.localTransactionRolledback(event); 646 } 647 648 startedLocalTx = false; 649 } 650 651 652 656 boolean matches(String hostName, 657 int serverPort, 658 String userName, 659 String mode) { 660 return this.hostName.equals(hostName) 661 && this.serverPort == serverPort 662 && this.userName.equals(userName) 663 && this.mode.equals(mode); 664 } 665 666 670 boolean isValid() 671 { 672 return valid; 673 } 674 675 676 void closeHandle(OutboundConnection handle) { 677 if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG)) 678 AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " closeHandle(" + handle + ")"); 679 680 ConnectionEvent event = 681 new ConnectionEvent (this, ConnectionEvent.CONNECTION_CLOSED); 682 event.setConnectionHandle(handle); 683 684 ConnectionEventListener listener; 685 for (int i = 0; i < listeners.size(); i++) { 686 listener = (ConnectionEventListener ) listeners.get(i); 687 listener.connectionClosed(event); 688 } 689 } 690 } 691 | Popular Tags |