1 18 package org.apache.activemq.ra; 19 20 import java.io.PrintWriter ; 21 import java.util.ArrayList ; 22 import java.util.Iterator ; 23 24 import javax.jms.Connection ; 25 import javax.jms.ExceptionListener ; 26 import javax.jms.JMSException ; 27 import javax.resource.ResourceException ; 28 import javax.resource.spi.ConnectionEvent ; 29 import javax.resource.spi.ConnectionEventListener ; 30 import javax.resource.spi.ConnectionRequestInfo ; 31 import javax.resource.spi.LocalTransaction ; 32 import javax.resource.spi.ManagedConnection ; 33 import javax.resource.spi.ManagedConnectionMetaData ; 34 import javax.security.auth.Subject ; 35 import javax.transaction.xa.XAResource ; 36 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.apache.activemq.ActiveMQConnection; 40 import org.apache.activemq.LocalTransactionEventListener; 41 import org.apache.activemq.TransactionContext; 42 43 59 public class ActiveMQManagedConnection implements ManagedConnection , ExceptionListener { 61 private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class); 62 63 private PrintWriter logWriter; 64 65 private final ActiveMQConnection physicalConnection; 66 private final TransactionContext transactionContext; 67 private final ArrayList proxyConnections = new ArrayList (); 68 private final ArrayList listeners = new ArrayList (); 69 private final LocalAndXATransaction localAndXATransaction; 70 71 private Subject subject; 72 private ActiveMQConnectionRequestInfo info; 73 private boolean destoryed; 74 75 public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException { 76 try { 77 this.subject = subject; 78 this.info = info; 79 this.physicalConnection = physicalConnection; 80 this.transactionContext = new TransactionContext(physicalConnection); 81 82 this.localAndXATransaction = new LocalAndXATransaction(transactionContext) { 83 public void setInManagedTx(boolean inManagedTx) throws JMSException { 84 super.setInManagedTx(inManagedTx); 85 Iterator iterator = proxyConnections.iterator(); 86 while (iterator.hasNext()) { 87 ManagedConnectionProxy proxy = (ManagedConnectionProxy) iterator.next(); 88 proxy.setUseSharedTxContext(inManagedTx); 89 } 90 } 91 }; 92 93 this.transactionContext.setLocalTransactionEventListener( new LocalTransactionEventListener() { 94 public void beginEvent() { 95 fireBeginEvent(); 96 } 97 public void commitEvent() { 98 fireCommitEvent(); 99 } 100 public void rollbackEvent() { 101 fireRollbackEvent(); 102 } 103 }); 104 105 physicalConnection.setExceptionListener(this); 106 } catch (JMSException e) { 107 throw new ResourceException ("Could not create a new connection: "+e.getMessage(), e); 108 } 109 } 110 111 public boolean isInManagedTx() { 112 return localAndXATransaction.isInManagedTx(); 113 } 114 115 static public boolean matches(Object x, Object y) { 116 if (x == null ^ y == null) { 117 return false; 118 } 119 if (x != null && !x.equals(y)) { 120 return false; 121 } 122 return true; 123 } 124 125 public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException { 126 127 if( !matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword()) ) { 129 ((ActiveMQConnection)physicalConnection).changeUserInfo(info.getUserName(), info.getPassword()); 130 } 131 132 if( info.getClientid()!=null && info.getClientid().length()>0 ) 134 physicalConnection.setClientID(info.getClientid()); 135 136 this.subject = subject; 137 this.info = info; 138 } 139 140 public Connection getPhysicalConnection() { 141 return physicalConnection; 142 } 143 144 private void fireBeginEvent() { 145 ConnectionEvent event = new ConnectionEvent (ActiveMQManagedConnection.this, 146 ConnectionEvent.LOCAL_TRANSACTION_STARTED); 147 Iterator iterator = listeners.iterator(); 148 while (iterator.hasNext()) { 149 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 150 l.localTransactionStarted(event); 151 } 152 } 153 154 private void fireCommitEvent() { 155 ConnectionEvent event = new ConnectionEvent (ActiveMQManagedConnection.this, 156 ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); 157 Iterator iterator = listeners.iterator(); 158 while (iterator.hasNext()) { 159 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 160 l.localTransactionCommitted(event); 161 } 162 } 163 164 private void fireRollbackEvent() { 165 ConnectionEvent event = new ConnectionEvent (ActiveMQManagedConnection.this, 166 ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); 167 Iterator iterator = listeners.iterator(); 168 while (iterator.hasNext()) { 169 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 170 l.localTransactionRolledback(event); 171 } 172 } 173 174 private void fireCloseEvent(ManagedConnectionProxy proxy) { 175 ConnectionEvent event = new ConnectionEvent (ActiveMQManagedConnection.this, 176 ConnectionEvent.CONNECTION_CLOSED); 177 event.setConnectionHandle(proxy); 178 179 Iterator iterator = listeners.iterator(); 180 while (iterator.hasNext()) { 181 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 182 l.connectionClosed(event); 183 } 184 } 185 186 private void fireErrorOccurredEvent(Exception error) { 187 ConnectionEvent event = new ConnectionEvent (ActiveMQManagedConnection.this, 188 ConnectionEvent.CONNECTION_ERROR_OCCURRED, error); 189 Iterator iterator = listeners.iterator(); 190 while (iterator.hasNext()) { 191 ConnectionEventListener l = (ConnectionEventListener ) iterator.next(); 192 l.connectionErrorOccurred(event); 193 } 194 } 195 196 200 public Object getConnection(Subject subject, ConnectionRequestInfo info) 201 throws ResourceException { 202 ManagedConnectionProxy proxy = new ManagedConnectionProxy(this); 203 proxyConnections.add(proxy); 204 return proxy; 205 } 206 207 private boolean isDestroyed() { 208 return destoryed; 209 } 210 211 216 public void destroy() throws ResourceException { 217 if (isDestroyed()) { 219 return; 220 } 221 222 cleanup(); 223 224 try { 225 physicalConnection.close(); 226 destoryed = true; 227 } catch (JMSException e) { 228 log.info("Error occured during close of a JMS connection.", e); 229 } 230 } 231 232 238 public void cleanup() throws ResourceException { 239 240 if (isDestroyed()) { 242 return; 243 } 244 245 Iterator iterator = proxyConnections.iterator(); 246 while (iterator.hasNext()) { 247 ManagedConnectionProxy proxy = (ManagedConnectionProxy) iterator.next(); 248 proxy.cleanup(); 249 } 250 proxyConnections.clear(); 251 localAndXATransaction.cleanup(); 252 253 try { 254 ((ActiveMQConnection)physicalConnection).cleanup(); 255 } catch (JMSException e) { 256 throw new ResourceException ("Could cleanup the ActiveMQ connection: "+e, e); 257 } 258 259 } 260 261 264 public void associateConnection(Object connection) throws ResourceException { 265 if (connection instanceof ManagedConnectionProxy) { 266 ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection; 267 proxyConnections.add(proxy); 268 } 269 else { 270 throw new ResourceException ("Not supported : associating connection instance of " + connection.getClass().getName()); 271 } 272 } 273 274 277 public void addConnectionEventListener(ConnectionEventListener listener) { 278 listeners.add(listener); 279 } 280 281 284 public void removeConnectionEventListener(ConnectionEventListener listener) { 285 listeners.remove(listener); 286 } 287 288 291 public XAResource getXAResource() throws ResourceException { 292 return localAndXATransaction; 293 } 294 295 298 public LocalTransaction getLocalTransaction() throws ResourceException { 299 return localAndXATransaction; 300 } 301 302 305 public ManagedConnectionMetaData getMetaData() throws ResourceException { 306 return new ManagedConnectionMetaData () { 307 308 public String getEISProductName() throws ResourceException { 309 if (physicalConnection == null) { 310 throw new ResourceException ("Not connected."); 311 } 312 try { 313 return physicalConnection.getMetaData().getJMSProviderName(); 314 } 315 catch (JMSException e) { 316 throw new ResourceException ("Error accessing provider.", e); 317 } 318 } 319 320 public String getEISProductVersion() throws ResourceException { 321 if (physicalConnection == null) { 322 throw new ResourceException ("Not connected."); 323 } 324 try { 325 return physicalConnection.getMetaData().getProviderVersion(); 326 } 327 catch (JMSException e) { 328 throw new ResourceException ("Error accessing provider.", e); 329 } 330 } 331 332 public int getMaxConnections() throws ResourceException { 333 if (physicalConnection == null) { 334 throw new ResourceException ("Not connected."); 335 } 336 return Integer.MAX_VALUE; 337 } 338 339 public String getUserName() throws ResourceException { 340 if (physicalConnection == null) { 341 throw new ResourceException ("Not connected."); 342 } 343 try { 344 return physicalConnection.getClientID(); 345 } 346 catch (JMSException e) { 347 throw new ResourceException ("Error accessing provider.", e); 348 } 349 } 350 }; 351 } 352 353 356 public void setLogWriter(PrintWriter logWriter) throws ResourceException { 357 this.logWriter = logWriter; 358 } 359 360 363 public PrintWriter getLogWriter() throws ResourceException { 364 return logWriter; 365 } 366 367 372 public boolean matches(Subject subject, ConnectionRequestInfo info) { 373 374 if (info == null) { 376 return false; 377 } 378 if (info.getClass() != ActiveMQConnectionRequestInfo.class) { 379 return false; 380 } 381 382 if (subject == null ^ this.subject == null) { 384 return false; 385 } 386 if (subject != null && !subject.equals(this.subject)) { 387 return false; 388 } 389 390 return info.equals(this.info); 392 } 393 394 400 public void proxyClosedEvent(ManagedConnectionProxy proxy) { 401 proxyConnections.remove(proxy); 402 proxy.cleanup(); 403 fireCloseEvent(proxy); 404 } 405 406 public void onException(JMSException e) { 407 log.warn("Connection failed: "+e); 408 log.debug("Cause: ", e); 409 410 for (Iterator iter = proxyConnections.iterator(); iter.hasNext();) { 412 ManagedConnectionProxy proxy = (ManagedConnectionProxy) iter.next(); 413 proxy.onException(e); 414 } 415 fireErrorOccurredEvent(e); 417 } 418 419 422 public TransactionContext getTransactionContext() { 423 return transactionContext; 424 } 425 426 } 427 | Popular Tags |