KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ipc > Client


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.ipc;
5
6 import java.net.Socket JavaDoc;
7 import java.net.InetSocketAddress JavaDoc;
8 import java.net.SocketTimeoutException JavaDoc;
9
10 import java.io.IOException JavaDoc;
11 import java.io.EOFException JavaDoc;
12 import java.io.DataInputStream JavaDoc;
13 import java.io.DataOutputStream JavaDoc;
14 import java.io.BufferedInputStream JavaDoc;
15 import java.io.BufferedOutputStream JavaDoc;
16
17 import java.util.Hashtable JavaDoc;
18 import java.util.logging.Logger JavaDoc;
19 import java.util.logging.Level JavaDoc;
20
21 import net.nutch.util.LogFormatter;
22 import net.nutch.io.Writable;
23 import net.nutch.io.UTF8;
24
25 /** A client for an IPC service. IPC calls take a single {@link Writable} as a
26  * parameter, and return a {@link Writable} as their value. A service runs on
27  * a port and is defined by a parameter class and a value class.
28  *
29  * @author Doug Cutting
30  * @see Server
31  */

32 public class Client {
33   public static final Logger JavaDoc LOG =
34     LogFormatter.getLogger("net.nutch.ipc.Client");
35
36   private Hashtable JavaDoc connections = new Hashtable JavaDoc();
37
38   private Class JavaDoc valueClass; // class of call values
39
private int timeout = 10000; // timeout for calls
40
private int counter; // counter for call ids
41
private boolean running = true; // true while client runs
42

43   /** A call waiting for a value. */
44   private class Call {
45     int id; // call id
46
Writable param; // parameter
47
Writable value; // value, null if error
48
String JavaDoc error; // error, null if value
49

50     protected Call(Writable param) {
51       this.param = param;
52       synchronized (Client.this) {
53         this.id = counter++;
54       }
55     }
56
57     /** Called by the connection thread when the call is complete and the
58      * value or error string are available. Notifies by default. */

59     public synchronized void callComplete() {
60         notify(); // notify caller
61
}
62   }
63
64   /** Thread that reads responses and notifies callers. Each connection owns a
65    * socket connected to a remote address. Calls are multiplexed through this
66    * socket: responses may be delivered out of order. */

67   private class Connection extends Thread JavaDoc {
68     private InetSocketAddress JavaDoc address; // address of server
69
private Socket JavaDoc socket; // connected socket
70
private DataInputStream JavaDoc in;
71     private DataOutputStream JavaDoc out;
72     private Hashtable JavaDoc calls = new Hashtable JavaDoc(); // currently active calls
73

74     public Connection(InetSocketAddress JavaDoc address) throws IOException JavaDoc {
75       this.address = address;
76       this.socket = new Socket JavaDoc(address.getAddress(), address.getPort());
77       socket.setSoTimeout(timeout);
78       this.in = new DataInputStream JavaDoc
79         (new BufferedInputStream JavaDoc(socket.getInputStream()));
80       this.out = new DataOutputStream JavaDoc
81         (new BufferedOutputStream JavaDoc(socket.getOutputStream()));
82       this.setDaemon(true);
83       this.setName("Client connection to "
84                    + address.getAddress().getHostAddress()
85                    + ":" + address.getPort());
86     }
87
88     public void run() {
89       LOG.info(getName() + ": starting");
90       try {
91         while (running) {
92           int id;
93           try {
94             id = in.readInt(); // try to read an id
95
} catch (SocketTimeoutException JavaDoc e) {
96             continue;
97           }
98
99           if (LOG.isLoggable(Level.FINE))
100             LOG.fine(getName() + " got value #" + id);
101
102           Call call = (Call)calls.remove(new Integer JavaDoc(id));
103           boolean isError = in.readBoolean(); // read if error
104
if (isError) {
105             UTF8 utf8 = new UTF8();
106             utf8.readFields(in); // read error string
107
call.error = utf8.toString();
108             call.value = null;
109           } else {
110             Writable value = makeValue();
111             value.readFields(in); // read value
112
call.value = value;
113             call.error = null;
114           }
115           call.callComplete(); // deliver result to caller
116
}
117       } catch (EOFException JavaDoc eof) {
118           // This is what happens when the remote side goes down
119
} catch (Exception JavaDoc e) {
120         LOG.log(Level.INFO, getName() + " caught: " + e, e);
121       } finally {
122         close();
123       }
124     }
125
126     /** Initiates a call by sending the parameter to the remote server.
127      * Note: this is not called from the Connection thread, but by other
128      * threads.
129      */

130     public void sendParam(Call call) throws IOException JavaDoc {
131       boolean error = true;
132       try {
133         calls.put(new Integer JavaDoc(call.id), call);
134         synchronized (out) {
135           if (LOG.isLoggable(Level.FINE))
136             LOG.fine(getName() + " sending #" + call.id);
137           out.writeInt(call.id);
138           call.param.write(out);
139           out.flush();
140         }
141         error = false;
142       } finally {
143         if (error)
144           close(); // close on error
145
}
146     }
147
148     /** Close the connection and remove it from the pool. */
149     public void close() {
150       LOG.info(getName() + ": closing");
151       connections.remove(address); // remove connection
152
try {
153         socket.close(); // close socket
154
} catch (IOException JavaDoc e) {}
155     }
156
157   }
158
159   /** Call implementation used for parallel calls. */
160   private class ParallelCall extends Call {
161     private ParallelResults results;
162     private int index;
163     
164     public ParallelCall(Writable param, ParallelResults results, int index) {
165       super(param);
166       this.results = results;
167       this.index = index;
168     }
169
170     /** Deliver result to result collector. */
171     public void callComplete() {
172       results.callComplete(this);
173     }
174   }
175
176   /** Result collector for parallel calls. */
177   private static class ParallelResults {
178     private Writable[] values;
179     private int size;
180     private int count;
181
182     public ParallelResults(int size) {
183       this.values = new Writable[size];
184       this.size = size;
185     }
186
187     /** Collect a result. */
188     public synchronized void callComplete(ParallelCall call) {
189       values[call.index] = call.value; // store the value
190
count++; // count it
191
if (count == size) // if all values are in
192
notify(); // then notify waiting caller
193
}
194   }
195
196   /** Construct an IPC client whose values are of the given {@link Writable}
197    * class. */

198   public Client(Class JavaDoc valueClass) {
199     this.valueClass = valueClass;
200   }
201
202   /** Stop all threads related to this client. No further calls may be made
203    * using this client. */

204   public void stop() {
205     LOG.info("Stopping client");
206     try {
207       Thread.sleep(timeout); // let all calls complete
208
} catch (InterruptedException JavaDoc e) {}
209     running = false;
210   }
211
212   /** Sets the timeout used for network i/o. */
213   public void setTimeout(int timeout) { this.timeout = timeout; }
214
215   /** Make a call, passing <code>param</code>, to the IPC server running at
216    * <code>address</code>, returning the value. Throws exceptions if there are
217    * network problems or if the remote code threw an exception. */

218   public Writable call(Writable param, InetSocketAddress JavaDoc address)
219     throws IOException JavaDoc {
220     Connection connection = getConnection(address);
221     Call call = new Call(param);
222     synchronized (call) {
223       connection.sendParam(call); // send the parameter
224
try {
225         call.wait(timeout); // wait for the result
226
} catch (InterruptedException JavaDoc e) {}
227
228       if (call.error != null) {
229         throw new IOException JavaDoc(call.error);
230       } else if (call.value == null) {
231         throw new IOException JavaDoc("timed out waiting for response");
232       } else {
233         return call.value;
234       }
235     }
236   }
237
238   /** Makes a set of calls in parallel. Each parameter is sent to the
239    * corresponding address. When all values are available, or have timed out
240    * or errored, the collected results are returned in an array. The array
241    * contains nulls for calls that timed out or errored. */

242   public Writable[] call(Writable[] params, InetSocketAddress JavaDoc[] addresses)
243     throws IOException JavaDoc {
244     if (params.length == 0) return new Writable[0];
245
246     ParallelResults results = new ParallelResults(params.length);
247     synchronized (results) {
248       for (int i = 0; i < params.length; i++) {
249         ParallelCall call = new ParallelCall(params[i], results, i);
250         try {
251           Connection connection = getConnection(addresses[i]);
252           connection.sendParam(call); // send each parameter
253
} catch (IOException JavaDoc e) {
254           LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors
255
results.size--; // wait for one fewer result
256
}
257       }
258       try {
259         results.wait(timeout); // wait for all results
260
} catch (InterruptedException JavaDoc e) {}
261
262       if (results.count == 0) {
263         throw new IOException JavaDoc("no responses");
264       } else {
265         return results.values;
266       }
267     }
268   }
269
270   /** Get a connection from the pool, or create a new one and add it to the
271    * pool. Connections to a given host/port are reused. */

272   private Connection getConnection(InetSocketAddress JavaDoc address)
273     throws IOException JavaDoc {
274     Connection connection;
275     synchronized (connections) {
276       connection = (Connection)connections.get(address);
277       if (connection == null) {
278         connection = new Connection(address);
279         connections.put(address, connection);
280         connection.start();
281       }
282     }
283     return connection;
284   }
285
286   private Writable makeValue() {
287     Writable value; // construct value
288
try {
289       value = (Writable)valueClass.newInstance();
290     } catch (InstantiationException JavaDoc e) {
291       throw new RuntimeException JavaDoc(e.toString());
292     } catch (IllegalAccessException JavaDoc e) {
293       throw new RuntimeException JavaDoc(e.toString());
294     }
295     return value;
296   }
297
298 }
299
Popular Tags