1 45 package org.exolab.jms.net.multiplexer; 46 47 import java.io.IOException ; 48 import java.security.Principal ; 49 50 import org.apache.commons.logging.Log; 51 import org.apache.commons.logging.LogFactory; 52 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 53 54 import org.exolab.jms.net.connector.AbstractManagedConnection; 55 import org.exolab.jms.net.connector.Authenticator; 56 import org.exolab.jms.net.connector.Caller; 57 import org.exolab.jms.net.connector.CallerImpl; 58 import org.exolab.jms.net.connector.ConnectException; 59 import org.exolab.jms.net.connector.Connection; 60 import org.exolab.jms.net.connector.IllegalStateException; 61 import org.exolab.jms.net.connector.InvocationHandler; 62 import org.exolab.jms.net.connector.Request; 63 import org.exolab.jms.net.connector.ResourceException; 64 import org.exolab.jms.net.connector.Response; 65 import org.exolab.jms.net.connector.SecurityException; 66 import org.exolab.jms.net.uri.URI; 67 import org.exolab.jms.net.util.ThreadPool; 68 69 70 77 public abstract class MultiplexedManagedConnection 78 extends AbstractManagedConnection 79 implements MultiplexerListener { 80 81 84 private Multiplexer _multiplexer; 85 86 89 private Thread _multiplexThread; 90 91 94 private Endpoint _endpoint; 95 96 99 private InvocationHandler _invoker; 100 101 104 private Principal _principal; 105 106 109 private Authenticator _authenticator; 110 111 114 private Caller _caller; 115 116 119 private PooledExecutor _pool; 120 121 125 private final boolean _sharedPool; 126 127 130 private ThreadGroup _group; 131 132 135 private static final int THREAD_POOL_SIZE = 10; 136 137 140 private static final Log _log = 141 LogFactory.getLog(MultiplexedManagedConnection.class); 142 143 144 149 public MultiplexedManagedConnection(Principal principal) { 150 _principal = principal; 151 _sharedPool = false; 152 } 153 154 161 public MultiplexedManagedConnection(Authenticator authenticator, 162 PooledExecutor pool) { 163 if (authenticator == null) { 164 throw new IllegalArgumentException ( 165 "Argument 'authenticator' is null"); 166 } 167 _authenticator = authenticator; 168 if (pool != null) { 169 _pool = pool; 170 _sharedPool = true; 171 } else { 172 _sharedPool = false; 173 } 174 } 175 176 184 public void setInvocationHandler(InvocationHandler handler) 185 throws ResourceException { 186 if (_invoker != null) { 187 throw new IllegalStateException ( 188 "An invocation handler is already registered"); 189 } 190 _invoker = handler; 191 try { 192 _endpoint = createEndpoint(); 193 if (isClient()) { 194 _multiplexer = createMultiplexer(_endpoint, _principal, 195 getThreadPool()); 196 } else { 197 _multiplexer = createMultiplexer(_endpoint, _authenticator, 198 getThreadPool()); 199 _principal = _multiplexer.getPrincipal(); 200 _caller = new CallerImpl(getRemoteURI(), getLocalURI()); 201 } 202 String name = getDisplayName() + "-Multiplexer"; 203 _multiplexThread = new Thread (getThreadGroup(), _multiplexer, 204 name); 205 _multiplexThread.start(); 206 } catch (IOException exception) { 207 throw new ConnectException("Failed to start multiplexer", 208 exception); 209 } 210 } 211 212 219 public synchronized Connection getConnection() 220 throws IllegalStateException { 221 if (_invoker == null) { 222 throw new IllegalStateException ("No InvocationHandler registered"); 223 } 224 return new MultiplexedConnection(this); 225 } 226 227 232 public boolean isAlive() { 233 boolean alive = false; 234 Multiplexer multiplexer; 235 synchronized (this) { 236 multiplexer = _multiplexer; 237 } 238 if (multiplexer != null) { 239 Channel channel = null; 240 try { 241 channel = multiplexer.getChannel(); 242 channel.ping(); 243 alive = true; 244 channel.release(); 245 } catch (IOException exception) { 246 _log.debug("Failed to ping", exception); 247 if (channel != null) { 248 channel.destroy(); 249 } 250 } 251 } 252 return alive; 253 } 254 255 260 public void destroy() throws ResourceException { 261 if (!_sharedPool && _pool != null) { 262 _pool.shutdownAfterProcessingCurrentlyQueuedTasks(); 263 } 264 265 Multiplexer multiplexer; 266 Thread thread; 267 Endpoint endpoint; 268 269 synchronized (this) { 270 multiplexer = _multiplexer; 271 thread = _multiplexThread; 272 endpoint = _endpoint; 273 } 274 try { 275 if (multiplexer != null) { 276 multiplexer.close(); 278 if (thread != Thread.currentThread()) { 279 try { 280 thread.join(); 282 } catch (InterruptedException exception) { 283 _log.debug(exception); 284 } 285 } 286 } else { 287 if (endpoint != null) { 288 try { 289 endpoint.close(); 290 } catch (IOException exception) { 291 throw new ResourceException("Failed to close endpoint", 292 exception); 293 } 294 } 295 } 296 } finally { 297 synchronized (this) { 298 _multiplexer = null; 299 _multiplexThread = null; 300 _endpoint = null; 301 } 302 } 303 } 304 305 317 public boolean hasPrincipal(Principal principal) { 318 boolean result = false; 319 if ((_principal != null && _principal.equals(principal)) 320 || (_principal == null && principal == null)) { 321 result = true; 322 } 323 return result; 324 } 325 326 331 public void request(Channel channel) { 332 _invoker.invoke(new ChannelInvocation(channel, getCaller())); 333 } 334 335 338 public void closed() { 339 notifyClosed(); 340 } 341 342 347 public void error(Throwable error) { 348 notifyError(error); 349 } 350 351 358 protected Response invoke(Connection connection, Request request) { 359 Response response; 360 Multiplexer multiplexer; 361 synchronized (this) { 362 multiplexer = _multiplexer; 363 } 364 if (multiplexer != null) { 365 Channel channel = null; 366 try { 367 channel = multiplexer.getChannel(); 368 response = channel.invoke(request); 369 channel.release(); 370 } catch (Exception exception) { 371 _log.debug(exception, exception); 372 response = new Response(exception); 373 if (channel != null) { 374 channel.destroy(); 375 } 376 } 377 } else { 378 response = new Response(new ResourceException("Connection lost")); 379 } 380 381 return response; 382 } 383 384 390 protected abstract Endpoint createEndpoint() throws IOException ; 391 392 402 protected Multiplexer createMultiplexer(Endpoint endpoint, 403 Principal principal, 404 PooledExecutor pool) 405 throws IOException , SecurityException { 406 return new Multiplexer(this, endpoint, principal, pool); 407 } 408 409 419 protected Multiplexer createMultiplexer(Endpoint endpoint, 420 Authenticator authenticator, 421 PooledExecutor pool) 422 throws IOException , ResourceException { 423 return new Multiplexer(this, endpoint, authenticator, pool); 424 } 425 426 431 protected synchronized PooledExecutor getThreadPool() { 432 if (_pool == null) { 433 _pool = new ThreadPool(getThreadGroup(), getDisplayName(), 434 THREAD_POOL_SIZE); 435 } 436 return _pool; 437 } 438 439 445 protected boolean isClient() { 446 return (_authenticator == null); 447 } 448 449 457 protected Caller getCaller() { 458 return _caller; 459 } 460 461 467 protected synchronized ThreadGroup getThreadGroup() { 468 if (_group == null) { 469 _group = new ThreadGroup (getDisplayName()); 470 } 471 return _group; 472 } 473 474 483 protected String getDisplayName() { 484 StringBuffer name = new StringBuffer (); 485 URI uri = null; 486 try { 487 uri = getRemoteURI(); 488 } catch (ResourceException ignore) { 489 if (_log.isDebugEnabled()) { 490 _log.debug("Failed to determine remote URI", ignore); 491 } 492 } 493 if (uri != null) { 494 name.append(uri.toString()); 495 } else { 496 name.append("<unknown>"); 497 } 498 if (isClient()) { 499 name.append("[client]"); 500 } else { 501 name.append("[server]"); 502 } 503 return name.toString(); 504 } 505 506 } 507 | Popular Tags |