KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > hudson > remoting > Channel


1 package hudson.remoting;
2
3 import java.io.EOFException JavaDoc;
4 import java.io.IOException JavaDoc;
5 import java.io.InputStream JavaDoc;
6 import java.io.ObjectInputStream JavaDoc;
7 import java.io.ObjectOutputStream JavaDoc;
8 import java.io.OutputStream JavaDoc;
9 import java.lang.reflect.Proxy JavaDoc;
10 import java.util.Hashtable JavaDoc;
11 import java.util.Map JavaDoc;
12 import java.util.Vector JavaDoc;
13 import java.util.concurrent.ExecutionException JavaDoc;
14 import java.util.concurrent.Executor JavaDoc;
15 import java.util.logging.Level JavaDoc;
16 import java.util.logging.Logger JavaDoc;
17
18 /**
19  * Represents a communication channel to the remote peer.
20  *
21  * <p>
22  * A {@link Channel} is a mechanism for two JVMs to communicate over
23  * bi-directional {@link InputStream}/{@link OutputStream} pair.
24  * {@link Channel} represents an endpoint of the stream, and thus
25  * two {@link Channel}s are always used in a pair.
26  *
27  * <p>
28  * Communication is established as soon as two {@link Channel} instances
29  * are created at the end fo the stream pair
30  * until the stream is terminated via {@link #close()}.
31  *
32  * <p>
33  * The basic unit of remoting is an executable {@link Callable} object.
34  * An application can create a {@link Callable} object, and execute it remotely
35  * by using the {@link #call(Callable)} method or {@link #callAsync(Callable)} method.
36  *
37  * <p>
38  * In this sense, {@link Channel} is a mechanism to delegate/offload computation
39  * to other JVMs and somewhat like an agent system. This is bit different from
40  * remoting technologies like CORBA or web services, where the server exposes a
41  * certain functionality that clients invoke.
42  *
43  * <p>
44  * {@link Callable} object, as well as the return value / exceptions,
45  * are transported by using Java serialization. All the necessary class files
46  * are also shipped over {@link Channel} on-demand, so there's no need to
47  * pre-deploy such classes on both JVMs.
48  *
49  *
50  * <h2>Implementor's Note</h2>
51  * <p>
52  * {@link Channel} builds its features in a layered model. Its higher-layer
53  * features are built on top of its lower-layer features, and they
54  * are called layer-0, layer-1, etc.
55  *
56  * <ul>
57  * <li>
58  * <b>Layer 0</b>:
59  * See {@link Command} for more details. This is for higher-level features,
60  * and not likely useful for applications directly.
61  * <li>
62  * <b>Layer 1</b>:
63  * See {@link Request} for more details. This is for higher-level features,
64  * and not likely useful for applications directly.
65  * </ul>
66  *
67  * @author Kohsuke Kawaguchi
68  */

69 public class Channel implements VirtualChannel {
70     private final ObjectInputStream JavaDoc ois;
71     private final ObjectOutputStream JavaDoc oos;
72     private final String JavaDoc name;
73     /*package*/ final Executor JavaDoc executor;
74
75     /**
76      * If true, this data channel is already closed and
77      * no further calls are accepted.
78      */

79     private volatile boolean closed = false;
80
81     private volatile boolean closing = false;
82
83     /*package*/ final Map JavaDoc<Integer JavaDoc,Request<?,?>> pendingCalls = new Hashtable JavaDoc<Integer JavaDoc,Request<?,?>>();
84
85     /**
86      * {@link ClassLoader}s that are proxies of the remote classloaders.
87      */

88     /*package*/ final ImportedClassLoaderTable importedClassLoaders = new ImportedClassLoaderTable(this);
89
90     /**
91      * Objects exported via {@link #export(Class, Object)}.
92      */

93     private final ExportTable<Object JavaDoc> exportedObjects = new ExportTable<Object JavaDoc>();
94
95     /**
96      * Registered listeners.
97      */

98     private final Vector JavaDoc<Listener> listeners = new Vector JavaDoc<Listener>();
99
100     public Channel(String JavaDoc name, Executor JavaDoc exec, InputStream JavaDoc is, OutputStream JavaDoc os) throws IOException JavaDoc {
101         this(name,exec,is,os,null);
102     }
103
104     /**
105      * Creates a new channel.
106      *
107      * @param name
108      * Human readable name of this channel. Used for debug/logging. Can be anything.
109      * @param exec
110      * Commands sent from the remote peer will be executed by using this {@link Executor}.
111      * @param is
112      * Stream connected to the remote peer.
113      * @param os
114      * Stream connected to the remote peer.
115      * @param header
116      * If non-null, receive the portion of data in <tt>is</tt> before
117      * the data goes into the "binary mode". This is useful
118      * when the established communication channel might include some data that might
119      * be useful for debugging/trouble-shooting.
120      */

121     public Channel(String JavaDoc name, Executor JavaDoc exec, InputStream JavaDoc is, OutputStream JavaDoc os, OutputStream JavaDoc header) throws IOException JavaDoc {
122         this.name = name;
123         this.executor = exec;
124
125         // write the magic preamble.
126
// certain communication channel, such as forking JVM via ssh,
127
// may produce some garbage at the beginning (for example a remote machine
128
// might print some warning before the program starts outputting its own data.)
129
//
130
// so use magic preamble and discard all the data up to that to improve robustness.
131
os.write(new byte[]{0,0,0,0}); // preamble
132
this.oos = new ObjectOutputStream JavaDoc(os);
133         oos.flush(); // make sure that stream header is sent to the other end. avoids dead-lock
134

135         {// read the input until we hit preamble
136
int ch;
137             int count=0;
138
139             while(true) {
140                 ch = is.read();
141                 if(ch==-1) {
142                     throw new EOFException JavaDoc("unexpected stream termination");
143                 }
144                 if(ch==0) {
145                     count++;
146                     if(count==4) break;
147                 } else {
148                     if(header!=null)
149                         header.write(ch);
150                     count=0;
151                 }
152             }
153         }
154
155         this.ois = new ObjectInputStream JavaDoc(is);
156         new ReaderThread(name).start();
157     }
158
159     /**
160      * Callback "interface" for changes in the state of {@link Channel}.
161      */

162     public static abstract class Listener {
163         /**
164          * When the channel was closed normally or abnormally due to an error.
165          *
166          * @param cause
167          * if the channel is closed abnormally, this parameter
168          * represents an exception that has triggered it.
169          */

170         public void onClosed(Channel channel, IOException JavaDoc cause) {}
171     }
172
173     /**
174      * Sends a command to the remote end and executes it there.
175      *
176      * <p>
177      * This is the lowest layer of abstraction in {@link Channel}.
178      * {@link Command}s are executed on a remote system in the order they are sent.
179      */

180     /*package*/ synchronized void send(Command cmd) throws IOException JavaDoc {
181         if(closed)
182             throw new IOException JavaDoc("already closed");
183         if(logger.isLoggable(Level.FINE))
184             logger.fine("Send "+cmd);
185         Channel old = Channel.setCurrent(this);
186         try {
187             oos.writeObject(cmd);
188             oos.flush(); // make sure the command reaches the other end.
189
} finally {
190             Channel.setCurrent(old);
191         }
192         oos.reset();
193     }
194
195     /**
196      * {@inheritDoc}
197      */

198     public <T> T export(Class JavaDoc<T> type, T instance) {
199         return export(type,instance,true);
200     }
201
202     /**
203      * @param userProxy
204      * If true, the returned proxy will be capable to handle classes
205      * defined in the user classloader as parameters and return values.
206      * Such proxy relies on {@link RemoteClassLoader} and related mechanism,
207      * so it's not usable for implementing lower-layer services that are
208      * used by {@link RemoteClassLoader}.
209      *
210      * To create proxies for objects inside remoting, pass in false.
211      */

212     /*package*/ <T> T export(Class JavaDoc<T> type, T instance, boolean userProxy) {
213         if(instance==null)
214             return null;
215
216         // proxy will unexport this instance when it's GC-ed on the remote machine.
217
final int id = export(instance);
218         return type.cast(Proxy.newProxyInstance( type.getClassLoader(), new Class JavaDoc[]{type},
219             new RemoteInvocationHandler(id,userProxy)));
220     }
221
222     /*package*/ int export(Object JavaDoc instance) {
223         return exportedObjects.export(instance);
224     }
225
226     /*package*/ Object JavaDoc getExportedObject(int oid) {
227         return exportedObjects.get(oid);
228     }
229
230     /*package*/ void unexport(int id) {
231         exportedObjects.unexport(id);
232     }
233
234     /**
235      * {@inheritDoc}
236      */

237     public <V,T extends Throwable JavaDoc>
238     V call(Callable<V,T> callable) throws IOException JavaDoc, T, InterruptedException JavaDoc {
239         try {
240             UserResponse<V,T> r = new UserRequest<V,T>(this, callable).call(this);
241             return r.retrieve(this, UserRequest.getClassLoader(callable));
242
243         // re-wrap the exception so that we can capture the stack trace of the caller.
244
} catch (ClassNotFoundException JavaDoc e) {
245             IOException JavaDoc x = new IOException JavaDoc("Remote call failed");
246             x.initCause(e);
247             throw x;
248         } catch (Error JavaDoc e) {
249             IOException JavaDoc x = new IOException JavaDoc("Remote call failed");
250             x.initCause(e);
251             throw x;
252         }
253     }
254
255     /**
256      * {@inheritDoc}
257      */

258     public <V,T extends Throwable JavaDoc>
259     Future<V> callAsync(final Callable<V,T> callable) throws IOException JavaDoc {
260         final Future<UserResponse<V,T>> f = new UserRequest<V,T>(this, callable).callAsync(this);
261         return new FutureAdapter<V,UserResponse<V,T>>(f) {
262             protected V adapt(UserResponse<V,T> r) throws ExecutionException JavaDoc {
263                 try {
264                     return r.retrieve(Channel.this, UserRequest.getClassLoader(callable));
265                 } catch (Throwable JavaDoc t) {// really means catch(T t)
266
throw new ExecutionException JavaDoc(t);
267                 }
268             }
269         };
270     }
271
272     /**
273      * Aborts the connection in response to an error.
274      */

275     private synchronized void terminate(IOException JavaDoc e) {
276         closed = true;
277         try {
278             synchronized(pendingCalls) {
279                 for (Request<?,?> req : pendingCalls.values())
280                     req.abort(e);
281                 pendingCalls.clear();
282             }
283         } finally {
284             notifyAll();
285
286             for (Listener l : listeners.toArray(new Listener[listeners.size()]))
287                 l.onClosed(this,e);
288         }
289     }
290
291     /**
292      * Registers a new {@link Listener}.
293      *
294      * @see #removeListener(Listener)
295      */

296     public void addListener(Listener l) {
297         listeners.add(l);
298     }
299
300     /**
301      * Removes a listener.
302      *
303      * @return
304      * false if the given listener has not been registered to begin with.
305      */

306     public boolean removeListener(Listener l) {
307         return listeners.remove(l);
308     }
309
310     /**
311      * Waits for this {@link Channel} to be closed down.
312      *
313      * The close-down of a {@link Channel} might be initiated locally or remotely.
314      *
315      * @throws InterruptedException
316      * If the current thread is interrupted while waiting for the completion.
317      */

318     public synchronized void join() throws InterruptedException JavaDoc {
319         while(!closed)
320             wait();
321     }
322
323     /**
324      * Notifies the remote peer that we are closing down.
325      *
326      * Execution of this command also triggers the {@link ReaderThread} to shut down
327      * and quit. The {@link CloseCommand} is always the last command to be sent on
328      * {@link ObjectOutputStream}, and it's the last command to be read.
329      */

330     private static final class CloseCommand extends Command {
331         protected void execute(Channel channel) {
332             try {
333                 channel.close();
334             } catch (IOException JavaDoc e) {
335                 logger.log(Level.SEVERE,"close command failed on "+channel.name,e);
336                 logger.log(Level.INFO,"close command created at",createdAt);
337             }
338         }
339
340         public String JavaDoc toString() {
341             return "close";
342         }
343     }
344
345     /**
346      * {@inheritDoc}
347      */

348     public synchronized void close() throws IOException JavaDoc {
349         // make sure no other commands get executed in between.
350
if(closed) return;
351         if(closing) return;
352         closing=true;
353
354         send(new CloseCommand());
355         oos.close();
356
357         // TODO: would be nice if we can wait for the completion of pending requests
358
terminate(null);
359     }
360
361     private final class ReaderThread extends Thread JavaDoc {
362         public ReaderThread(String JavaDoc name) {
363             super("Channel reader thread: "+name);
364         }
365
366         public void run() {
367             try {
368                 while(!closed) {
369                     try {
370                         Command cmd = null;
371                         Channel old = Channel.setCurrent(Channel.this);
372                         try {
373                             cmd = (Command)ois.readObject();
374                         } finally {
375                             Channel.setCurrent(old);
376                         }
377                         if(logger.isLoggable(Level.FINE))
378                             logger.fine("Received "+cmd);
379                         cmd.execute(Channel.this);
380                     } catch (ClassNotFoundException JavaDoc e) {
381                         logger.log(Level.SEVERE, "Unable to read a command",e);
382                     }
383                 }
384                 ois.close();
385             } catch (IOException JavaDoc e) {
386                 logger.log(Level.SEVERE, "I/O error in channel "+name,e);
387                 terminate(e);
388             }
389         }
390     }
391
392     /*package*/ static Channel setCurrent(Channel channel) {
393         Channel old = CURRENT.get();
394         CURRENT.set(channel);
395         return old;
396     }
397
398     /**
399      * This method can be invoked during the serialization/deserialization of
400      * objects when they are transferred to the remote {@link Channel},
401      * as well as during {@link Callable#call()} is invoked.
402      *
403      * @return null
404      * if the calling thread is not performing serialization.
405      */

406     public static Channel current() {
407         return CURRENT.get();
408     }
409
410     /**
411      * Remembers the current "channel" associated for this thread.
412      */

413     private static final ThreadLocal JavaDoc<Channel> CURRENT = new ThreadLocal JavaDoc<Channel>();
414
415     private static final Logger JavaDoc logger = Logger.getLogger(Channel.class.getName());
416 }
417
Popular Tags