KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > hudson > remoting > Request


1 package hudson.remoting;
2
3 import java.io.IOException JavaDoc;
4 import java.io.Serializable JavaDoc;
5 import java.util.concurrent.ExecutionException JavaDoc;
6 import java.util.concurrent.TimeUnit JavaDoc;
7 import java.util.concurrent.TimeoutException JavaDoc;
8 import java.util.logging.Level JavaDoc;
9 import java.util.logging.Logger JavaDoc;
10
11 /**
12  * Request/response pattern over {@link Channel}, the layer-1 service.
13  *
14  * <p>
15  * This assumes that the receiving side has all the class definitions
16  * available to de-serialize {@link Request}, just like {@link Command}.
17  *
18  * @author Kohsuke Kawaguchi
19  * @see Response
20  */

21 abstract class Request<RSP extends Serializable JavaDoc,EXC extends Throwable JavaDoc> extends Command {
22
23     /**
24      * Executed on a remote system to perform the task.
25      *
26      * @param channel
27      * The local channel. From the view point of the JVM that
28      * {@link #call(Channel) made the call}, this channel is
29      * the remote channel.
30      * @return
31      * the return value will be sent back to the calling process.
32      * @throws EXC
33      * The exception will be forwarded to the calling process.
34      * If no checked exception is supposed to be thrown, use {@link RuntimeException}.
35      */

36     protected abstract RSP perform(Channel channel) throws EXC;
37
38     /**
39      * Uniquely identifies this request.
40      * Used for correlation between request and response.
41      */

42     private final int id;
43
44     private volatile Response<RSP,EXC> response;
45
46     protected Request() {
47         synchronized(Request.class) {
48             id = nextId++;
49         }
50     }
51
52     /**
53      * Sends this request to a remote system, and blocks until we receives a response.
54      *
55      * @param channel
56      * The channel from which the request will be sent.
57      * @throws InterruptedException
58      * If the thread is interrupted while it's waiting for the call to complete.
59      * @throws IOException
60      * If there's an error during the communication.
61      * @throws RequestAbortedException
62      * If the channel is terminated while the call is in progress.
63      * @throws EXC
64      * If the {@link #perform(Channel)} throws an exception.
65      */

66     public final RSP call(Channel channel) throws EXC, InterruptedException JavaDoc, IOException JavaDoc {
67         // Channel.send() locks channel, and there are other call sequences
68
// ( like Channel.terminate()->Request.abort()->Request.onCompleted() )
69
// that locks channel -> request, so lock objects in the same order
70
synchronized(channel) {
71             synchronized(this) {
72                 response=null;
73
74                 channel.pendingCalls.put(id,this);
75                 channel.send(this);
76             }
77         }
78
79         synchronized(this) {
80             while(response==null)
81                 wait(); // wait until the response arrives
82

83             Object JavaDoc exc = response.exception;
84
85             if(exc !=null)
86                 throw (EXC)exc; // some versions of JDK fails to compile this line. If so, upgrade your JDK.
87

88             return response.returnValue;
89         }
90     }
91
92     /**
93      * Makes an invocation but immediately returns without waiting for the completion
94      * (AKA asynchronous invocation.)
95      *
96      * @param channel
97      * The channel from which the request will be sent.
98      * @return
99      * The {@link Future} object that can be used to wait for the completion.
100      * @throws IOException
101      * If there's an error during the communication.
102      */

103     public final Future<RSP> callAsync(Channel channel) throws IOException JavaDoc {
104         response=null;
105
106         channel.pendingCalls.put(id,this);
107         channel.send(this);
108
109         return new Future<RSP>() {
110             /**
111              * The task cannot be cancelled.
112              */

113             public boolean cancel(boolean mayInterruptIfRunning) {
114                 return false;
115             }
116
117             public boolean isCancelled() {
118                 return false;
119             }
120
121             public boolean isDone() {
122                 return response!=null;
123             }
124
125             public RSP get() throws InterruptedException JavaDoc, ExecutionException JavaDoc {
126                 synchronized(Request.this) {
127                     while(response==null)
128                         Request.this.wait(); // wait until the response arrives
129

130                     if(response.exception!=null)
131                         throw new ExecutionException JavaDoc(response.exception);
132
133                     return response.returnValue;
134                 }
135             }
136
137             public RSP get(long timeout, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc, ExecutionException JavaDoc, TimeoutException JavaDoc {
138                 synchronized(Request.this) {
139                     if(response==null)
140                         Request.this.wait(unit.toMillis(timeout)); // wait until the response arrives
141
if(response==null)
142                         throw new TimeoutException JavaDoc();
143
144                     if(response.exception!=null)
145                         throw new ExecutionException JavaDoc(response.exception);
146
147                     return response.returnValue;
148                 }
149             }
150         };
151     }
152
153
154     /**
155      * Called by the {@link Response} when we received it.
156      */

157     /*package*/ synchronized void onCompleted(Response<RSP,EXC> response) {
158         this.response = response;
159         notify();
160     }
161
162     /**
163      * Aborts the processing. The calling thread will receive an exception.
164      */

165     /*package*/ void abort(IOException JavaDoc e) {
166         onCompleted(new Response(id,new RequestAbortedException(e)));
167     }
168
169     /**
170      * Schedules the execution of this request.
171      */

172     protected final void execute(final Channel channel) {
173         channel.executor.execute(new Runnable JavaDoc() {
174             public void run() {
175                 try {
176                     RSP rsp;
177                     try {
178                         rsp = Request.this.perform(channel);
179                     } catch (Throwable JavaDoc t) {
180                         // error return
181
channel.send(new Response<RSP,Throwable JavaDoc>(id,t));
182                         return;
183                     }
184                     // normal completion
185
channel.send(new Response<RSP,EXC>(id,rsp));
186                 } catch (IOException JavaDoc e) {
187                     // communication error.
188
// this means the caller will block forever
189
logger.log(Level.SEVERE, "Failed to send back a reply",e);
190                 }
191             }
192         });
193     }
194
195     /**
196      * Next request ID.
197      */

198     private static int nextId=0;
199
200     private static final long serialVersionUID = 1L;
201
202     private static final Logger JavaDoc logger = Logger.getLogger(Request.class.getName());
203
204     //private static final Unsafe unsafe = getUnsafe();
205

206     //private static Unsafe getUnsafe() {
207
// try {
208
// Field f = Unsafe.class.getDeclaredField("theUnsafe");
209
// f.setAccessible(true);
210
// return (Unsafe)f.get(null);
211
// } catch (NoSuchFieldException e) {
212
// throw new Error(e);
213
// } catch (IllegalAccessException e) {
214
// throw new Error(e);
215
// }
216
//}
217
}
218
Popular Tags