1 45 package org.exolab.jms.net.connector; 46 47 import java.security.Principal ; 48 import java.util.ArrayList ; 49 import java.util.Collections ; 50 import java.util.HashMap ; 51 import java.util.List ; 52 import java.util.Map ; 53 54 import org.apache.commons.logging.Log; 55 import org.apache.commons.logging.LogFactory; 56 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; 57 58 import org.exolab.jms.net.uri.URI; 59 import org.exolab.jms.net.util.ThreadFactory; 60 import org.exolab.jms.net.util.Properties; 61 62 63 71 class DefaultConnectionPool 72 implements ManagedConnectionAcceptorListener, 73 ManagedConnectionListener, ConnectionPool { 74 75 78 private final ManagedConnectionFactory _factory; 79 80 83 private final InvocationHandler _handler; 84 85 88 private final ConnectionFactory _resolver; 89 90 93 private List _connections = Collections.synchronizedList(new ArrayList ()); 94 95 99 private Map _handles = Collections.synchronizedMap(new HashMap ()); 100 101 104 private List _acceptors = Collections.synchronizedList(new ArrayList ()); 105 106 109 private List _accepted = Collections.synchronizedList(new ArrayList ()); 110 111 115 private Map _entries = Collections.synchronizedMap(new HashMap ()); 116 117 120 private ClockDaemon _daemon; 121 122 126 private final long _reapInterval; 127 128 131 private volatile CallerListener _listener; 132 133 136 private static final String POOL_PREFIX = "org.exolab.jms.net.pool."; 137 138 141 private static final String REAP_INTERVAL = "reapInterval"; 142 143 146 private static final Log _log 147 = LogFactory.getLog(DefaultConnectionPool.class); 148 149 150 161 public DefaultConnectionPool(ManagedConnectionFactory factory, 162 InvocationHandler handler, 163 ConnectionFactory resolver, 164 Map properties) throws ResourceException { 165 if (factory == null) { 166 throw new IllegalArgumentException ("Argument 'factory' is null"); 167 } 168 if (handler == null) { 169 throw new IllegalArgumentException ("Argument 'handler' is null"); 170 } 171 if (resolver == null) { 172 throw new IllegalArgumentException ("Argument 'resolver' is null"); 173 } 174 _factory = factory; 175 _handler = handler; 176 _resolver = resolver; 177 178 Properties config = new Properties(properties, POOL_PREFIX); 179 int seconds = config.getInt(REAP_INTERVAL, 15); 180 if (seconds < 0) { 181 seconds = 0; 182 } 183 _reapInterval = seconds * 1000; 184 185 } 186 187 195 public ManagedConnection createManagedConnection(Principal principal, 196 ConnectionRequestInfo info) 197 throws ResourceException { 198 ManagedConnection connection = _factory.createManagedConnection( 199 principal, info); 200 return add(connection, false); 201 } 202 203 211 public ManagedConnectionAcceptor createManagedConnectionAcceptor( 212 Authenticator authenticator, ConnectionRequestInfo info) 213 throws ResourceException { 214 215 ManagedConnectionAcceptor acceptor; 216 217 acceptor = _factory.createManagedConnectionAcceptor(authenticator, 218 info); 219 _acceptors.add(acceptor); 220 return acceptor; 221 } 222 223 232 public ManagedConnection matchManagedConnections(Principal principal, 233 ConnectionRequestInfo info) 234 throws ResourceException { 235 236 ManagedConnection result = null; 237 result = _factory.matchManagedConnections(_connections, principal, 238 info); 239 if (result != null) { 240 result = (ManagedConnection) _handles.get(result); 242 } else { 243 result = _factory.matchManagedConnections(_accepted, principal, 244 info); 245 } 246 return result; 247 } 248 249 257 public ManagedConnectionAcceptor matchManagedConnectionAcceptors( 258 ConnectionRequestInfo info) throws ResourceException { 259 260 return _factory.matchManagedConnectionAcceptors(_acceptors, info); 261 } 262 263 268 public ManagedConnectionAcceptorListener 269 getManagedConnectionAcceptorListener() { 270 return this; 271 } 272 273 279 public void accepted(ManagedConnectionAcceptor acceptor, 280 ManagedConnection connection) { 281 try { 282 add(connection, true); 283 } catch (ResourceException exception) { 284 _log.debug("Failed to accept connection", exception); 285 } 286 } 287 288 295 public void closed(ManagedConnection source) { 296 if (_log.isDebugEnabled()) { 297 _log.debug("Connection " + source + " closed by peer, destroying"); 298 } 299 remove(source); 300 } 301 302 310 public void error(ManagedConnection source, Throwable throwable) { 311 if (_log.isDebugEnabled()) { 312 _log.debug("Error on connection " + source + ", destroying", 313 throwable); 314 } 315 remove(source); 316 } 317 318 323 public void close() throws ResourceException { 324 ManagedConnectionAcceptor[] acceptors = 325 (ManagedConnectionAcceptor[]) _acceptors.toArray( 326 new ManagedConnectionAcceptor[0]); 327 _acceptors.clear(); 328 329 for (int i = 0; i < acceptors.length; ++i) { 330 acceptors[i].close(); 331 } 332 333 ManagedConnection[] connections = 334 (ManagedConnection[]) _entries.keySet().toArray( 335 new ManagedConnection[0]); 336 for (int i = 0; i < connections.length; ++i) { 337 connections[i].destroy(); 338 } 339 _entries.clear(); 340 341 _accepted.clear(); 342 _connections.clear(); 343 344 stopReaper(); 345 } 346 347 353 public void error(ManagedConnectionAcceptor acceptor, 354 Throwable throwable) { 355 _acceptors.remove(acceptor); 356 357 String uri = "<unknown>"; 358 try { 359 uri = acceptor.getURI().toString(); 360 } catch (ResourceException ignore) { 361 } 362 _log.error("Failed to accept connections on URI=" + uri, 363 throwable); 364 365 try { 366 acceptor.close(); 367 } catch (ResourceException exception) { 368 if (_log.isDebugEnabled()) { 369 _log.debug("Failed to close acceptor, URI=" + uri, exception); 370 } 371 } 372 } 373 374 379 public void setCallerListener(CallerListener listener) { 380 _listener = listener; 381 } 382 383 396 protected ManagedConnection add(ManagedConnection connection, 397 boolean accepted) throws ResourceException { 398 ManagedConnection result; 399 400 PoolEntry entry = new PoolEntry(connection, accepted); 401 _entries.put(connection, entry); 402 if (accepted) { 403 _accepted.add(connection); 404 result = connection; 405 } else { 406 _connections.add(connection); 407 ManagedConnection handle = new ManagedConnectionHandle( 408 connection, _resolver); 409 _handles.put(connection, handle); 410 result = handle; 411 } 412 ContextInvocationHandler handler = new ContextInvocationHandler( 413 _handler, _resolver); 414 try { 415 connection.setInvocationHandler(handler); 416 connection.setConnectionEventListener(this); 417 handler.setConnection(connection.getConnection()); 418 } catch (ResourceException exception) { 419 try { 420 _log.debug("Failed to initialise connection, destroying", 421 exception); 422 connection.destroy(); 423 } catch (ResourceException nested) { 424 _log.debug("Failed to destroy connection", nested); 425 } finally { 426 _entries.remove(connection); 427 if (accepted) { 428 _accepted.remove(connection); 429 } else { 430 _connections.remove(connection); 431 _handles.remove(connection); 432 } 433 } 434 throw exception; 436 } 437 438 entry.setInitialised(); 441 442 startReaper(); 443 444 return result; 445 } 446 447 452 protected void remove(ManagedConnection connection) { 453 PoolEntry entry = (PoolEntry) _entries.remove(connection); 454 if (entry != null) { 455 if (entry.getAccepted()) { 456 _accepted.remove(connection); 457 } else { 458 _connections.remove(connection); 459 _handles.remove(connection); 460 } 461 URI remoteURI = null; 462 URI localURI = null; 463 try { 464 remoteURI = connection.getRemoteURI(); 465 localURI = connection.getLocalURI(); 466 } catch (ResourceException exception) { 467 _log.debug("Failed to get connection URIs", exception); 468 } 469 470 try { 471 connection.destroy(); 472 } catch (ResourceException exception) { 473 _log.debug("Failed to destroy connection", exception); 474 } 475 if (remoteURI != null && localURI != null) { 476 notifyDisconnection(remoteURI, localURI); 477 } 478 } else { 479 _log.debug("ManagedConnection not found"); 480 } 481 if (_entries.isEmpty()) { 482 stopReaper(); 483 } 484 } 485 486 492 private void notifyDisconnection(URI remoteURI, URI localURI) { 493 CallerListener listener = _listener; 494 if (listener != null) { 495 listener.disconnected(new CallerImpl(remoteURI, localURI)); 496 } 497 } 498 499 502 private synchronized void startReaper() { 503 if (_daemon == null && _reapInterval > 0) { 504 _daemon = new ClockDaemon(); 505 ThreadFactory creator = 506 new ThreadFactory(null, "ManagedConnectionReaper", false); 507 _daemon.setThreadFactory(creator); 508 _daemon.executePeriodically(_reapInterval, new Reaper(), false); 509 } 510 } 511 512 515 private synchronized void stopReaper() { 516 if (_daemon != null) { 517 _daemon.shutDown(); 518 _daemon = null; 519 } 520 } 521 522 525 private class Reaper implements Runnable { 526 527 530 private long _lastReapTimestamp = 0; 531 532 535 public Reaper() { 536 _lastReapTimestamp = System.currentTimeMillis(); 537 } 538 539 542 public void run() { 543 try { 544 reapIdleConnections(); 545 if (!done()) { 546 reapDeadConnections(); 547 } 548 } catch (Throwable exception) { 549 _log.error(exception, exception); 550 } 551 _lastReapTimestamp = System.currentTimeMillis(); 552 } 553 554 557 private void reapIdleConnections() { 558 Map.Entry [] entries = (Map.Entry []) _handles.entrySet().toArray( 559 new Map.Entry [0]); 560 for (int i = 0; i < entries.length && !done(); ++i) { 561 Map.Entry entry = entries[i]; 562 ManagedConnection connection = 563 (ManagedConnection) entry.getKey(); 564 PoolEntry pooled = (PoolEntry) _entries.get(connection); 565 if (pooled != null && pooled.isInitialised()) { 566 ManagedConnectionHandle handle = 567 (ManagedConnectionHandle) entry.getValue(); 568 if (handle.canDestroy() && idle(handle)) { 569 if (_log.isDebugEnabled()) { 570 try { 571 _log.debug("Reaping idle connection, URI=" 572 + connection.getRemoteURI() 573 + ", local URI=" 574 + connection.getLocalURI()); 575 } catch (ResourceException ignore) { 576 } 578 } 579 remove(connection); 580 } 581 } 582 } 583 } 584 585 592 private boolean idle(ManagedConnectionHandle handle) { 593 boolean result = false; 594 long timestamp = handle.getLastUsedTimestamp(); 595 if (timestamp == 0) { 596 handle.updateLastUsedTimestamp(); 599 } else if (timestamp < _lastReapTimestamp) { 600 result = true; 601 } 602 return result; 603 } 604 605 608 private void reapDeadConnections() { 609 PoolEntry[] entries = (PoolEntry[]) _entries.values().toArray( 610 new PoolEntry[0]); 611 for (int i = 0; i < entries.length && !done(); ++i) { 612 PoolEntry entry = entries[i]; 613 if (entry.isInitialised()) { 614 ManagedConnection connection = entry.getManagedConnection(); 615 if (!connection.isAlive()) { 616 if (_log.isDebugEnabled()) { 617 try { 618 _log.debug("Reaping dead connection, URI=" 619 + connection.getRemoteURI() 620 + ", local URI=" 621 + connection.getLocalURI()); 622 } catch (ResourceException ignore) { 623 } 625 } 626 remove(connection); 627 } 628 } 629 } 630 } 631 632 638 private boolean done() { 639 return Thread.currentThread().isInterrupted(); 640 } 641 } 642 643 } 644 | Popular Tags |