1 22 package org.jboss.resource.connectionmanager; 23 24 import java.io.ByteArrayOutputStream ; 25 import java.io.PrintStream ; 26 import java.lang.reflect.Method ; 27 import java.util.ArrayList ; 28 import java.util.Collection ; 29 import java.util.HashMap ; 30 import java.util.HashSet ; 31 import java.util.Iterator ; 32 import java.util.LinkedList ; 33 import java.util.Map ; 34 import java.util.Set ; 35 import java.util.WeakHashMap ; 36 37 import javax.management.ObjectName ; 38 import javax.resource.ResourceException ; 39 import javax.resource.spi.ConnectionRequestInfo ; 40 import javax.transaction.Synchronization ; 41 import javax.transaction.SystemException ; 42 import javax.transaction.Transaction ; 43 import javax.transaction.TransactionManager ; 44 45 import org.jboss.ejb.EnterpriseContext; 46 import org.jboss.system.ServiceMBeanSupport; 47 import org.jboss.tm.TxUtils; 48 import org.jboss.tm.usertx.client.ServerVMClientUserTransaction; 49 import org.jboss.util.Strings; 50 51 63 public class CachedConnectionManager 64 extends ServiceMBeanSupport 65 implements ServerVMClientUserTransaction.UserTransactionStartedListener, 66 CachedConnectionManagerMBean 67 { 68 private boolean specCompliant; 69 70 protected boolean trace; 71 72 private boolean debug; 73 74 protected boolean error; 75 76 private ObjectName transactionManagerServiceName; 77 private TransactionManager tm; 78 79 86 private final ThreadLocal currentObjects = new ThreadLocal (); 87 88 93 private final Map objectToConnectionManagerMap = new HashMap (); 94 95 98 private Map connectionStackTraces = new WeakHashMap (); 99 100 106 public CachedConnectionManager() 107 { 108 super(); 109 trace = log.isTraceEnabled(); 110 } 111 112 public boolean isSpecCompliant() 113 { 114 return specCompliant; 115 } 116 117 public void setSpecCompliant(boolean specCompliant) 118 { 119 if (specCompliant) 120 log.warn("THE SpecCompliant ATTRIBUTE IS MISNAMED SEE http://jira.jboss.com/jira/browse/JBAS-1662"); 121 this.specCompliant = specCompliant; 122 } 123 124 public boolean isDebug() 125 { 126 return debug; 127 } 128 129 public void setDebug(boolean value) 130 { 131 this.debug = value; 132 } 133 134 public boolean isError() 135 { 136 return error; 137 } 138 139 public void setError(boolean value) 140 { 141 this.error = value; 142 } 143 144 public ObjectName getTransactionManagerServiceName() 145 { 146 return transactionManagerServiceName; 147 } 148 149 public void setTransactionManagerServiceName(ObjectName transactionManagerServiceName) 150 { 151 this.transactionManagerServiceName = transactionManagerServiceName; 152 } 153 154 public CachedConnectionManager getInstance() 155 { 156 return this; 157 } 158 159 public int getInUseConnections() 160 { 161 synchronized (connectionStackTraces) 162 { 163 return connectionStackTraces.size(); 164 } 165 } 166 167 public Map listInUseConnections() 168 { 169 synchronized (connectionStackTraces) 170 { 171 HashMap result = new HashMap (); 172 for (Iterator i = connectionStackTraces.entrySet().iterator(); i.hasNext();) 173 { 174 Map.Entry entry = (Map.Entry ) i.next(); 175 Throwable stackTrace = (Throwable ) entry.getValue(); 176 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 177 PrintStream ps = new PrintStream (baos); 178 stackTrace.printStackTrace(ps); 179 result.put(entry.getKey().toString(), baos.toString()); 180 } 181 return result; 182 } 183 } 184 185 protected void startService() 186 throws Exception 187 { 188 tm = (TransactionManager ) getServer().getAttribute(transactionManagerServiceName, 189 "TransactionManager"); 190 TransactionSynchronizer.setTransactionManager(tm); 191 ServerVMClientUserTransaction.getSingleton().registerTxStartedListener(this); 192 EnterpriseContext.setUserTransactionStartedListener(this); 193 } 194 195 protected void stopService() throws Exception 196 { 197 ServerVMClientUserTransaction.getSingleton().unregisterTxStartedListener(this); 198 EnterpriseContext.setUserTransactionStartedListener(null); 199 } 200 201 203 211 public void pushMetaAwareObject(final Object rawKey, Set unsharableResources) 212 throws ResourceException 213 { 214 LinkedList stack = (LinkedList ) currentObjects.get(); 215 if (stack == null) 216 { 217 if (trace) 218 log.trace("new stack for key: " + Strings.defaultToString(rawKey)); 219 stack = new LinkedList (); 220 currentObjects.set(stack); 221 } else 223 { 224 if (trace) 225 log.trace("old stack for key: " + Strings.defaultToString(rawKey)); 226 } KeyConnectionAssociation key = new KeyConnectionAssociation(rawKey); 233 if (specCompliant && !stack.contains(key)) 234 { 235 reconnect(key, unsharableResources); 236 } 237 stack.addLast(key); 238 } 239 240 246 public void popMetaAwareObject(Set unsharableResources) 247 throws ResourceException 248 { 249 LinkedList stack = (LinkedList ) currentObjects.get(); 250 KeyConnectionAssociation oldKey = (KeyConnectionAssociation) stack.removeLast(); 251 if (trace) 252 log.trace("popped object: " + Strings.defaultToString(oldKey)); 253 if (specCompliant) 254 { 255 if (!stack.contains(oldKey)) 256 { 257 disconnect(oldKey, unsharableResources); 258 } } 260 else if (debug) 261 { 262 if (closeAll(oldKey.getCMToConnectionsMap()) && error) 263 throw new ResourceException ("Some connections were not closed, see the log for the allocation stacktraces"); 264 } 265 266 } 270 271 KeyConnectionAssociation peekMetaAwareObject() 272 { 273 LinkedList stack = (LinkedList ) currentObjects.get(); 274 if (stack == null) 275 return null; 276 if (!stack.isEmpty()) 277 return (KeyConnectionAssociation) stack.getLast(); 278 else 279 return null; 280 } 281 282 284 void registerConnection(ConnectionCacheListener cm, ConnectionListener cl, Object connection, ConnectionRequestInfo cri) 285 { 286 if (debug) 287 { 288 synchronized (connectionStackTraces) 289 { 290 connectionStackTraces.put(connection, new Throwable ("STACKTRACE")); 291 } 292 } 293 294 KeyConnectionAssociation key = peekMetaAwareObject(); 295 if (trace) 296 log.trace("registering connection from " + cm + ", connection : " + connection + ", key: " + key); 297 if (key == null) 298 return; 300 ConnectionRecord cr = new ConnectionRecord(cl, connection, cri); 301 Map cmToConnectionsMap = key.getCMToConnectionsMap(); 302 Collection conns = (Collection ) cmToConnectionsMap.get(cm); 303 if (conns == null) 304 { 305 conns = new ArrayList (); 306 cmToConnectionsMap.put(cm, conns); 307 } 308 conns.add(cr); 309 } 310 311 void unregisterConnection(ConnectionCacheListener cm, Object c) 312 { 313 if (debug) 314 { 315 CloseConnectionSynchronization cas = getCloseConnectionSynchronization(false); 316 if (cas != null) 317 cas.remove(c); 318 synchronized (connectionStackTraces) 319 { 320 connectionStackTraces.remove(c); 321 } 322 } 323 324 KeyConnectionAssociation key = peekMetaAwareObject(); 325 if (trace) 326 log.trace("unregistering connection from " + cm + ", object: " + c + ", key: " + key); 327 if (key == null) 328 return; 330 Map cmToConnectionsMap = key.getCMToConnectionsMap(); 331 Collection conns = (Collection ) cmToConnectionsMap.get(cm); 332 if (conns == null) 333 return; for (Iterator i = conns.iterator(); i.hasNext();) 335 { 336 if (((ConnectionRecord) i.next()).connection == c) 337 { 338 i.remove(); 339 return; 340 } 341 } 342 throw new IllegalStateException ("Trying to return an unknown connection2! " + c); 343 } 344 345 public void userTransactionStarted() 347 throws SystemException 348 { 349 KeyConnectionAssociation key = peekMetaAwareObject(); 350 if (trace) 351 log.trace("user tx started, key: " + key); 352 if (key == null) 353 return; 355 Map cmToConnectionsMap = key.getCMToConnectionsMap(); 356 for (Iterator i = cmToConnectionsMap.keySet().iterator(); i.hasNext();) 357 { 358 ConnectionCacheListener cm = (ConnectionCacheListener) i.next(); 359 Collection conns = (Collection ) cmToConnectionsMap.get(cm); 360 cm.transactionStarted(conns); 361 } 362 } 363 364 373 private void reconnect(KeyConnectionAssociation key, Set unsharableResources) 374 throws ResourceException 375 { 376 Map cmToConnectionsMap = null; 377 synchronized (objectToConnectionManagerMap) 378 { 379 cmToConnectionsMap = (Map ) objectToConnectionManagerMap.get(key); 380 if (cmToConnectionsMap == null) 381 return; 382 } 383 key.setCMToConnectionsMap(cmToConnectionsMap); 384 for (Iterator i = cmToConnectionsMap.keySet().iterator(); i.hasNext();) 385 { 386 ConnectionCacheListener cm = (ConnectionCacheListener) i.next(); 387 Collection conns = (Collection ) cmToConnectionsMap.get(cm); 388 cm.reconnect(conns, unsharableResources); 389 } 390 } 391 392 private void disconnect(KeyConnectionAssociation key, Set unsharableResources) 393 throws ResourceException 394 { 395 Map cmToConnectionsMap = key.getCMToConnectionsMap(); 396 if (!cmToConnectionsMap.isEmpty()) 397 { 398 synchronized (objectToConnectionManagerMap) 399 { 400 objectToConnectionManagerMap.put(key, cmToConnectionsMap); 401 } 402 for (Iterator i = cmToConnectionsMap.keySet().iterator(); i.hasNext();) 403 { 404 ConnectionCacheListener cm = (ConnectionCacheListener) i.next(); 405 Collection conns = (Collection ) cmToConnectionsMap.get(cm); 406 cm.disconnect(conns, unsharableResources); 407 } 408 } 409 } 410 411 private boolean closeAll(Map cmToConnectionsMap) 412 { 413 if (debug == false) 414 return false; 415 416 boolean unclosed = false; 417 418 Collection connections = cmToConnectionsMap.values(); 419 if (connections.size() != 0) 420 { 421 for (Iterator i = connections.iterator(); i.hasNext();) 422 { 423 Collection conns = (Collection ) i.next(); 424 for (Iterator j = conns.iterator(); j.hasNext();) 425 { 426 Object c = ((ConnectionRecord) j.next()).connection; 427 CloseConnectionSynchronization cas = getCloseConnectionSynchronization(true); 428 if (cas == null) 429 { 430 unclosed = true; 431 closeConnection(c); 432 } 433 else 434 cas.add(c); 435 } 436 } 437 } 438 439 return unclosed; 440 } 441 442 451 void unregisterConnectionCacheListener(ConnectionCacheListener cm) 452 { 453 if (trace) 454 log.trace("unregisterConnectionCacheListener: " + cm); 455 synchronized (objectToConnectionManagerMap) 456 { 457 for (Iterator i = objectToConnectionManagerMap.values().iterator(); i.hasNext();) 458 { 459 Map cmToConnectionsMap = (Map ) i.next(); 460 if (cmToConnectionsMap != null) 461 cmToConnectionsMap.remove(cm); 462 } 463 } 464 } 465 466 470 private final static class KeyConnectionAssociation 471 { 472 private final Object o; 474 475 private Map cmToConnectionsMap; 477 478 KeyConnectionAssociation(final Object o) 479 { 480 this.o = o; 481 } 482 483 public boolean equals(Object other) 484 { 485 return (other instanceof KeyConnectionAssociation) && o == ((KeyConnectionAssociation) other).o; 486 } 487 488 public String toString() 489 { 490 return Strings.defaultToString(o); 491 } 492 493 public int hashCode() 494 { 495 return System.identityHashCode(o); 496 } 497 498 public void setCMToConnectionsMap(Map cmToConnectionsMap) 499 { 500 this.cmToConnectionsMap = cmToConnectionsMap; 501 } 502 503 public Map getCMToConnectionsMap() 504 { 505 if (cmToConnectionsMap == null) 506 { 507 cmToConnectionsMap = new HashMap (); 508 } return cmToConnectionsMap; 510 } 511 } 512 513 private void closeConnection(Object c) 514 { 515 try 516 { 517 Throwable e; 518 synchronized (connectionStackTraces) 519 { 520 e = (Throwable ) connectionStackTraces.remove(c); 521 } 522 Method m = c.getClass().getMethod("close", new Class []{}); 523 try 524 { 525 if (e != null) 526 log.info("Closing a connection for you. Please close them yourself: " + c, e); 527 else 528 log.info("Closing a connection for you. Please close them yourself: " + c); 529 m.invoke(c, new Object []{}); 530 } 531 catch (Throwable t) 532 { 533 log.info("Throwable trying to close a connection for you, please close it yourself", t); 534 } 535 } 536 catch (NoSuchMethodException nsme) 537 { 538 log.info("Could not find a close method on alleged connection objects. Please close your own connections."); 539 } 540 } 541 542 private CloseConnectionSynchronization getCloseConnectionSynchronization(boolean createIfNotFound) 543 { 544 try 545 { 546 Transaction tx = tm.getTransaction(); 547 if (TxUtils.isActive(tx)) 548 { 549 TransactionSynchronizer.lock(tx); 550 try 551 { 552 CloseConnectionSynchronization cas = (CloseConnectionSynchronization) TransactionSynchronizer.getCCMSynchronization(tx); 553 if (cas == null && createIfNotFound) 554 { 555 cas = new CloseConnectionSynchronization(); 556 TransactionSynchronizer.registerCCMSynchronization(tx, cas); 557 } 558 return cas; 559 } 560 finally 561 { 562 TransactionSynchronizer.unlock(tx); 563 } 564 } 565 } 566 catch (Throwable t) 567 { 568 log.debug("Unable to synchronize with transaction", t); 569 } 570 return null; 571 } 572 573 private class CloseConnectionSynchronization implements Synchronization 574 { 575 HashSet connections = new HashSet (); 576 boolean closing = false; 577 578 public CloseConnectionSynchronization() 579 { 580 } 581 582 public synchronized void add(Object c) 583 { 584 if (closing) 585 return; 586 connections.add(c); 587 } 588 589 public synchronized void remove(Object c) 590 { 591 if (closing) 592 return; 593 connections.remove(c); 594 } 595 596 public void beforeCompletion() 597 { 598 } 599 600 public void afterCompletion(int status) 601 { 602 synchronized (this) 603 { 604 closing = true; 605 } 606 for (Iterator i = connections.iterator(); i.hasNext();) 607 closeConnection(i.next()); 608 connections.clear(); } 610 } 611 } 612 | Popular Tags |