1 46 47 package org.mr.ra; 48 49 import java.io.PrintWriter ; 50 import java.util.ArrayList ; 51 import java.util.Iterator ; 52 53 import javax.jms.Connection ; 54 import javax.jms.ExceptionListener ; 55 import javax.jms.JMSException ; 56 import javax.resource.ResourceException ; 57 import javax.resource.spi.ConnectionEvent ; 58 import javax.resource.spi.ConnectionEventListener ; 59 import javax.resource.spi.ConnectionRequestInfo ; 60 import javax.resource.spi.LocalTransaction ; 61 import javax.resource.spi.ManagedConnection ; 62 import javax.resource.spi.ManagedConnectionMetaData ; 63 import javax.security.auth.Subject ; 64 import javax.transaction.xa.XAResource ; 65 66 import org.apache.commons.logging.Log; 67 import org.apache.commons.logging.LogFactory; 68 import org.mr.api.jms.LocalTransactionEventListener; 69 import org.mr.api.jms.MantaConnection; 70 import org.mr.api.jms.TransactionContext; 71 74 90 public class ManagedConnectionImpl implements ManagedConnection , ExceptionListener { 92 private static final Log log = LogFactory.getLog(ManagedConnectionImpl.class); 93 94 private PrintWriter logWriter; 95 96 private final MantaConnection physicalConnection; 97 private final TransactionContext transactionContext; 98 private final ArrayList proxyConnections = new ArrayList (); 99 private final ArrayList listeners = new ArrayList (); 100 private final LocalAndXATransaction localAndXATransaction; 101 102 private Subject subject; 103 private ConnectionRequestInfoImpl info; 104 private boolean destoryed; 105 106 public ManagedConnectionImpl(Subject subject, 107 MantaConnection physicalConnection, 108 ConnectionRequestInfoImpl info) 109 throws ResourceException { 110 try { 111 this.subject = subject; 112 this.info = info; 113 this.physicalConnection = physicalConnection; 114 this.transactionContext = new TransactionContext(); 116 117 this.localAndXATransaction = new LocalAndXATransaction(transactionContext) { 118 public void setInManagedTx(boolean inManagedTx) throws JMSException { 119 super.setInManagedTx(inManagedTx); 120 Iterator iterator = proxyConnections.iterator(); 121 while (iterator.hasNext()) { 122 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next(); 123 proxy.setUseSharedTxContext(inManagedTx); 124 } 125 } 126 }; 127 128 this.transactionContext.setLocalTransactionEventListener(new LocalTransactionEventListener() { 129 public void beginEvent(String arg0) { 130 fireBeginEvent(); 131 } 132 public void commitEvent(String arg0) { 133 fireCommitEvent(); 134 } 135 public void rollbackEvent(String arg0) { 136 fireRollbackEvent(); 137 } 138 }); 139 140 physicalConnection.setExceptionListener(this); 141 } catch (JMSException e) { 142 throw new ResourceException ("Could not create a new connection: "+e.getMessage(), e); 143 } 144 } 145 146 public boolean isInManagedTx() { 147 return localAndXATransaction.isInManagedTx(); 148 } 149 150 private boolean matches(Object x, Object y) { 152 if (x == null ^ y == null) { 153 return false; 154 } 155 if (x != null && !x.equals(y)) { 156 return false; 157 } 158 return true; 159 } 160 161 175 176 public Connection getPhysicalConnection() { 177 return physicalConnection; 178 } 179 180 181 182 186 public Object getConnection(Subject subject, ConnectionRequestInfo info) 187 throws ResourceException { 188 JMSConnectionProxy proxy = new JMSConnectionProxy(this); 189 proxyConnections.add(proxy); 190 return proxy; 191 } 192 193 private boolean isDestroyed() { 194 return destoryed; 195 } 196 197 202 public void destroy() throws ResourceException { 203 if (isDestroyed()) { 205 return; 206 } 207 208 cleanup(); 209 210 try { 211 physicalConnection.close(); 212 destoryed = true; 213 } catch (JMSException e) { 214 log.info("Error occured during close of a JMS connection.", e); 215 } 216 } 217 218 224 public void cleanup() throws ResourceException { 225 if (isDestroyed()) { 227 return; 228 } 229 230 Iterator iterator = proxyConnections.iterator(); 231 while (iterator.hasNext()) { 232 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next(); 233 proxy.cleanup(); 234 } 235 proxyConnections.clear(); 236 237 try { 238 ((MantaConnection)physicalConnection).cleanup(); 239 } catch (JMSException e) { 240 throw new ResourceException ("Could cleanup the Manta connection: "+e, e); 241 } 242 243 } 244 245 248 public void associateConnection(Object connection) throws ResourceException { 249 throw new ResourceException ("No support for connection association"); 250 } 251 252 255 public void addConnectionEventListener(ConnectionEventListener listener) { 256 listeners.add(listener); 257 } 258 259 262 public void removeConnectionEventListener(ConnectionEventListener listener) { 263 listeners.remove(listener); 264 } 265 266 269 public XAResource getXAResource() throws ResourceException { 270 return localAndXATransaction; 271 } 273 274 277 public LocalTransaction getLocalTransaction() throws ResourceException { 278 return localAndXATransaction; 279 } 281 282 285 public ManagedConnectionMetaData getMetaData() throws ResourceException { 286 return new ManagedConnectionMetaData () { 287 288 public String getEISProductName() throws ResourceException { 289 if (physicalConnection == null) { 290 throw new ResourceException ("Not connected."); 291 } 292 try { 293 return physicalConnection.getMetaData().getJMSProviderName(); 294 } 295 catch (JMSException e) { 296 throw new ResourceException ("Error accessing provider.", e); 297 } 298 } 299 300 public String getEISProductVersion() throws ResourceException { 301 if (physicalConnection == null) { 302 throw new ResourceException ("Not connected."); 303 } 304 try { 305 return physicalConnection.getMetaData().getProviderVersion(); 306 } 307 catch (JMSException e) { 308 throw new ResourceException ("Error accessing provider.", e); 309 } 310 } 311 312 public int getMaxConnections() throws ResourceException { 313 if (physicalConnection == null) { 314 throw new ResourceException ("Not connected."); 315 } 316 return Integer.MAX_VALUE; 317 } 318 319 public String getUserName() throws ResourceException { 320 if (physicalConnection == null) { 321 throw new ResourceException ("Not connected."); 322 } 323 try { 324 return physicalConnection.getClientID(); 325 } 326 catch (JMSException e) { 327 throw new ResourceException ("Error accessing provider.", e); 328 } 329 } 330 }; 331 } 332 333 336 public void setLogWriter(PrintWriter logWriter) throws ResourceException { 337 this.logWriter = logWriter; 338 } 339 340 343 public PrintWriter getLogWriter() throws ResourceException { 344 return logWriter; 345 } 346 347 352 public boolean matches(Subject subject, ConnectionRequestInfo info) { 353 if (info == null) { 355 return false; 356 } 357 if (info.getClass() != ConnectionRequestInfoImpl.class) { 358 return false; 359 } 360 361 if (subject == null ^ this.subject == null) { 363 return false; 364 } 365 if (subject != null && !subject.equals(this.subject)) { 366 return false; 367 } 368 369 return info.equals(this.info); 371 } 372 373 379 public void proxyClosedEvent(JMSConnectionProxy proxy) { 380 proxyConnections.remove(proxy); 381 proxy.cleanup(); 382 fireCloseEvent(proxy); 383 } 384 385 public void onException(JMSException e) { 386 log.warn("Connection failed: "+e); 387 log.debug("Cause: ", e); 388 389 for (Iterator iter = proxyConnections.iterator(); iter.hasNext();) { 391 JMSConnectionProxy proxy = (JMSConnectionProxy) iter.next(); 392 proxy.onException(e); 393 } 394 fireErrorOccurredEvent(e); 396 } 397 398 401 TransactionContext getTransactionContext() { 402 return transactionContext; 403 } 404 405 406 407 413 private void fireBeginEvent() { 414 ConnectionEvent event = new ConnectionEvent (ManagedConnectionImpl.this, 415 ConnectionEvent.LOCAL_TRANSACTION_STARTED); 416 Iterator iterator = listeners.iterator(); 417 while (iterator.hasNext()) { 418 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 419 l.localTransactionStarted(event); 420 } 421 } 422 423 private void fireCommitEvent() { 424 ConnectionEvent event = new ConnectionEvent (ManagedConnectionImpl.this, 425 ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); 426 Iterator iterator = listeners.iterator(); 427 while (iterator.hasNext()) { 428 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 429 l.localTransactionCommitted(event); 430 } 431 } 432 433 private void fireRollbackEvent() { 434 ConnectionEvent event = new ConnectionEvent (ManagedConnectionImpl.this, 435 ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); 436 Iterator iterator = listeners.iterator(); 437 while (iterator.hasNext()) { 438 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 439 l.localTransactionRolledback(event); 440 } 441 } 442 443 private void fireCloseEvent(JMSConnectionProxy proxy) { 444 ConnectionEvent event = new ConnectionEvent (ManagedConnectionImpl.this, 445 ConnectionEvent.CONNECTION_CLOSED); 446 event.setConnectionHandle(proxy); 447 448 Iterator iterator = listeners.iterator(); 449 while (iterator.hasNext()) { 450 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 451 l.connectionClosed(event); 452 } 453 } 454 455 private void fireErrorOccurredEvent(Exception error) { 456 ConnectionEvent event = new ConnectionEvent (ManagedConnectionImpl.this, 457 ConnectionEvent.CONNECTION_ERROR_OCCURRED, error); 458 Iterator iterator = listeners.iterator(); 459 while (iterator.hasNext()) { 460 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 461 l.connectionErrorOccurred(event); 462 } 463 } 464 465 } 466 | Popular Tags |