KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > net > multiplexer > MultiplexedManagedConnection


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2004-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: MultiplexedManagedConnection.java,v 1.7 2005/06/04 14:28:53 tanderson Exp $
44  */

45 package org.exolab.jms.net.multiplexer;
46
47 import java.io.IOException JavaDoc;
48 import java.security.Principal JavaDoc;
49
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
53
54 import org.exolab.jms.net.connector.AbstractManagedConnection;
55 import org.exolab.jms.net.connector.Authenticator;
56 import org.exolab.jms.net.connector.Caller;
57 import org.exolab.jms.net.connector.CallerImpl;
58 import org.exolab.jms.net.connector.ConnectException;
59 import org.exolab.jms.net.connector.Connection;
60 import org.exolab.jms.net.connector.IllegalStateException;
61 import org.exolab.jms.net.connector.InvocationHandler;
62 import org.exolab.jms.net.connector.Request;
63 import org.exolab.jms.net.connector.ResourceException;
64 import org.exolab.jms.net.connector.Response;
65 import org.exolab.jms.net.connector.SecurityException;
66 import org.exolab.jms.net.uri.URI;
67 import org.exolab.jms.net.util.ThreadPool;
68
69
70 /**
71  * A <code>ManagedConnection</code> that uses a {@link Multiplexer} to multiplex
72  * data over an {@link Endpoint}
73  *
74  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
75  * @version $Revision: 1.7 $ $Date: 2005/06/04 14:28:53 $
76  */

77 public abstract class MultiplexedManagedConnection
78         extends AbstractManagedConnection
79         implements MultiplexerListener {
80
81     /**
82      * The multiplexer.
83      */

84     private Multiplexer _multiplexer;
85
86     /**
87      * The thread used to run {@link #_multiplexer}.
88      */

89     private Thread JavaDoc _multiplexThread;
90
91     /**
92      * The endpoint to multiplex data over.
93      */

94     private Endpoint _endpoint;
95
96     /**
97      * The invocation handler.
98      */

99     private InvocationHandler _invoker;
100
101     /**
102      * The security principal.
103      */

104     private Principal JavaDoc _principal;
105
106     /**
107      * The connection authenticator, for server side instances.
108      */

109     private Authenticator _authenticator;
110
111     /**
112      * Cached caller instance. Non-null if this is a server-side instance.
113      */

114     private Caller _caller;
115
116     /**
117      * The thread pool for handling invocation requests.
118      */

119     private PooledExecutor _pool;
120
121     /**
122      * Determines if the thread pool is shared amongst multiple managed
123      * connections. If so, this isn't responsible for cleaning it up.
124      */

125     private final boolean _sharedPool;
126
127     /**
128      * The thread group to associate any allocated threads with.
129      */

130     private ThreadGroup JavaDoc _group;
131
132     /**
133      * The maximum no. of threads to use at any one time.
134      */

135     private static final int THREAD_POOL_SIZE = 10;
136
137     /**
138      * The logger.
139      */

140     private static final Log _log =
141             LogFactory.getLog(MultiplexedManagedConnection.class);
142
143
144     /**
145      * Construct a new client <code>MultiplexedManagedConnection</code>.
146      *
147      * @param principal the security principal. May be <code>null</code>
148      */

149     public MultiplexedManagedConnection(Principal JavaDoc principal) {
150         _principal = principal;
151         _sharedPool = false;
152     }
153
154     /**
155      * Construct a new server <code>MultiplexedManagedConnection</code>.
156      *
157      * @param authenticator the connection authenticator
158      * @param pool the thread pool for handling invocation requests.
159      * May be <code>null</code>
160      */

161     public MultiplexedManagedConnection(Authenticator authenticator,
162                                         PooledExecutor pool) {
163         if (authenticator == null) {
164             throw new IllegalArgumentException JavaDoc(
165                     "Argument 'authenticator' is null");
166         }
167         _authenticator = authenticator;
168         if (pool != null) {
169             _pool = pool;
170             _sharedPool = true;
171         } else {
172             _sharedPool = false;
173         }
174     }
175
176     /**
177      * Registers a handler for handling invocations on objects exported via this
178      * connection. Once a handler is registered, it cannot be de-registered.
179      *
180      * @param handler the invocation handler
181      * @throws IllegalStateException if a handler is already registered
182      * @throws ResourceException for any error
183      */

184     public void setInvocationHandler(InvocationHandler handler)
185             throws ResourceException {
186         if (_invoker != null) {
187             throw new IllegalStateException JavaDoc(
188                     "An invocation handler is already registered");
189         }
190         _invoker = handler;
191         try {
192             _endpoint = createEndpoint();
193             if (isClient()) {
194                 _multiplexer = createMultiplexer(_endpoint, _principal,
195                                                  getThreadPool());
196             } else {
197                 _multiplexer = createMultiplexer(_endpoint, _authenticator,
198                                                  getThreadPool());
199                 _principal = _multiplexer.getPrincipal();
200                 _caller = new CallerImpl(getRemoteURI(), getLocalURI());
201             }
202             String JavaDoc name = getDisplayName() + "-Multiplexer";
203             _multiplexThread = new Thread JavaDoc(getThreadGroup(), _multiplexer,
204                                           name);
205             _multiplexThread.start();
206         } catch (IOException JavaDoc exception) {
207             throw new ConnectException("Failed to start multiplexer",
208                                        exception);
209         }
210     }
211
212     /**
213      * Creates a new connection handle for the underlying physical connection.
214      *
215      * @return a new connection handle
216      * @throws IllegalStateException if an invocation handler hasn't been
217      * registered
218      */

219     public synchronized Connection getConnection()
220             throws IllegalStateException JavaDoc {
221         if (_invoker == null) {
222             throw new IllegalStateException JavaDoc("No InvocationHandler registered");
223         }
224         return new MultiplexedConnection(this);
225     }
226
227     /**
228      * Determines if the underlying physical connection is alive.
229      *
230      * @return <code>true</code> if the connection is alive
231      */

232     public boolean isAlive() {
233         boolean alive = false;
234         Multiplexer multiplexer;
235         synchronized (this) {
236             multiplexer = _multiplexer;
237         }
238         if (multiplexer != null) {
239             Channel channel = null;
240             try {
241                 channel = multiplexer.getChannel();
242                 channel.ping();
243                 alive = true;
244                 channel.release();
245             } catch (IOException JavaDoc exception) {
246                 _log.debug("Failed to ping", exception);
247                 if (channel != null) {
248                     channel.destroy();
249                 }
250             }
251         }
252         return alive;
253     }
254
255     /**
256      * Destroys the physical connection.
257      *
258      * @throws ResourceException for any error
259      */

260     public void destroy() throws ResourceException {
261         if (!_sharedPool && _pool != null) {
262             _pool.shutdownAfterProcessingCurrentlyQueuedTasks();
263         }
264
265         Multiplexer multiplexer;
266         Thread JavaDoc thread;
267         Endpoint endpoint;
268
269         synchronized (this) {
270             multiplexer = _multiplexer;
271             thread = _multiplexThread;
272             endpoint = _endpoint;
273         }
274         try {
275             if (multiplexer != null) {
276                 // multiplexer handles endpoint closure
277
multiplexer.close();
278                 if (thread != Thread.currentThread()) {
279                     try {
280                         // wait for the multiplexer thread to terminate
281
thread.join();
282                     } catch (InterruptedException JavaDoc exception) {
283                         _log.debug(exception);
284                     }
285                 }
286             } else {
287                 if (endpoint != null) {
288                     try {
289                         endpoint.close();
290                     } catch (IOException JavaDoc exception) {
291                         throw new ResourceException("Failed to close endpoint",
292                                                     exception);
293                     }
294                 }
295             }
296         } finally {
297             synchronized (this) {
298                 _multiplexer = null;
299                 _multiplexThread = null;
300                 _endpoint = null;
301             }
302         }
303     }
304
305     /**
306      * Determines if the security principal that owns this connection is the
307      * same as that supplied.
308      * <p/>
309      * NOTE: If this is a server-side instance, the principal is only available
310      * once the connection has been established, by {@link
311      * #setInvocationHandler}
312      *
313      * @param principal the principal to compare. May be <code>null</code>.
314      * @return <code>true</code> if the principal that owns this connection is
315      * the same as <code>principal</code>
316      */

317     public boolean hasPrincipal(Principal JavaDoc principal) {
318         boolean result = false;
319         if ((_principal != null && _principal.equals(principal))
320                 || (_principal == null && principal == null)) {
321             result = true;
322         }
323         return result;
324     }
325
326     /**
327      * Invoked for an invocation request.
328      *
329      * @param channel the channel the invocation is on
330      */

331     public void request(Channel channel) {
332         _invoker.invoke(new ChannelInvocation(channel, getCaller()));
333     }
334
335     /**
336      * Invoked when the connection is closed by the peer.
337      */

338     public void closed() {
339         notifyClosed();
340     }
341
342     /**
343      * Invoked when an error occurs on the multiplexer.
344      *
345      * @param error the error
346      */

347     public void error(Throwable JavaDoc error) {
348         notifyError(error);
349     }
350
351     /**
352      * Invoke a method on a remote object.
353      *
354      * @param connection the connection invoking the request
355      * @param request the request
356      * @return the response
357      */

358     protected Response invoke(Connection connection, Request request) {
359         Response response;
360         Multiplexer multiplexer;
361         synchronized (this) {
362             multiplexer = _multiplexer;
363         }
364         if (multiplexer != null) {
365             Channel channel = null;
366             try {
367                 channel = multiplexer.getChannel();
368                 response = channel.invoke(request);
369                 channel.release();
370             } catch (Exception JavaDoc exception) {
371                 _log.debug(exception, exception);
372                 response = new Response(exception);
373                 if (channel != null) {
374                     channel.destroy();
375                 }
376             }
377         } else {
378             response = new Response(new ResourceException("Connection lost"));
379         }
380
381         return response;
382     }
383
384     /**
385      * Creates the endpoint to multiplex data over.
386      *
387      * @return the endpoint to multiplex data over
388      * @throws IOException for any I/O error
389      */

390     protected abstract Endpoint createEndpoint() throws IOException JavaDoc;
391
392     /**
393      * Create a new client-side multiplexer.
394      *
395      * @param endpoint the endpoint to multiplex messages over
396      * @param principal the security principal
397      * @param pool thread pool for handling invocation requests
398      * @return a new client-side multiplexer
399      * @throws IOException if an I/O error occurs
400      * @throws SecurityException if connection is refused by the server
401      */

402     protected Multiplexer createMultiplexer(Endpoint endpoint,
403                                             Principal JavaDoc principal,
404                                             PooledExecutor pool)
405             throws IOException JavaDoc, SecurityException JavaDoc {
406         return new Multiplexer(this, endpoint, principal, pool);
407     }
408
409     /**
410      * Create a new server-side multiplexer.
411      *
412      * @param endpoint the endpoint to multiplex messages over
413      * @param authenticator the connection authetnicator
414      * @param pool thread pool for handling invocation requests
415      * @return a new server-side multiplexer
416      * @throws IOException if an I/O error occurs
417      * @throws ResourceException if the authenticator cannot authenticate
418      */

419     protected Multiplexer createMultiplexer(Endpoint endpoint,
420                                             Authenticator authenticator,
421                                             PooledExecutor pool)
422             throws IOException JavaDoc, ResourceException {
423         return new Multiplexer(this, endpoint, authenticator, pool);
424     }
425
426     /**
427      * Returns the thread pool to handle invocation requests.
428      *
429      * @return the thread pool to handle invocation requests
430      */

431     protected synchronized PooledExecutor getThreadPool() {
432         if (_pool == null) {
433             _pool = new ThreadPool(getThreadGroup(), getDisplayName(),
434                                    THREAD_POOL_SIZE);
435         }
436         return _pool;
437     }
438
439     /**
440      * Helper to determine if this is a client-side or server side instance.
441      *
442      * @return <code>true</code> if this is a client-side instance, otherwise
443      * <code>false</code>
444      */

445     protected boolean isClient() {
446         return (_authenticator == null);
447     }
448
449     /**
450      * Helper to return an {@link Caller} instance, denoting the client
451      * performing a method invocation. Only applicable for server-side, and only
452      * after the multiplexer has been created.
453      *
454      * @return the caller instance, or <code>null</code> if it hasn't been
455      * initialised
456      */

457     protected Caller getCaller() {
458         return _caller;
459     }
460
461     /**
462      * Returns the thread group to associate with allocated threads.
463      *
464      * @return the thread group to associate with allocated threads, or
465      * <code>null</code> to use the default thread group.
466      */

467     protected synchronized ThreadGroup JavaDoc getThreadGroup() {
468         if (_group == null) {
469             _group = new ThreadGroup JavaDoc(getDisplayName());
470         }
471         return _group;
472     }
473
474     /**
475      * Helper to generate a descriptive name, for display purposes.
476      * <p/>
477      * This implementation returns the remote URI, concatenated with "[client]"
478      * if this is a client connection, or "[server]" if it is a server
479      * connection.
480      *
481      * @return the display name
482      */

483     protected String JavaDoc getDisplayName() {
484         StringBuffer JavaDoc name = new StringBuffer JavaDoc();
485         URI uri = null;
486         try {
487             uri = getRemoteURI();
488         } catch (ResourceException ignore) {
489             if (_log.isDebugEnabled()) {
490                 _log.debug("Failed to determine remote URI", ignore);
491             }
492         }
493         if (uri != null) {
494             name.append(uri.toString());
495         } else {
496             name.append("<unknown>");
497         }
498         if (isClient()) {
499             name.append("[client]");
500         } else {
501             name.append("[server]");
502         }
503         return name.toString();
504     }
505
506 }
507
Popular Tags