KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > ipc > Server


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.ipc;
5
6 import java.io.IOException JavaDoc;
7 import java.io.EOFException JavaDoc;
8 import java.io.DataInputStream JavaDoc;
9 import java.io.DataOutputStream JavaDoc;
10 import java.io.BufferedInputStream JavaDoc;
11 import java.io.BufferedOutputStream JavaDoc;
12
13 import java.net.Socket JavaDoc;
14 import java.net.ServerSocket JavaDoc;
15 import java.net.SocketTimeoutException JavaDoc;
16
17 import java.util.LinkedList JavaDoc;
18 import java.util.logging.Logger JavaDoc;
19 import java.util.logging.Level JavaDoc;
20
21 import net.nutch.util.LogFormatter;
22 import net.nutch.io.Writable;
23 import net.nutch.io.UTF8;
24
25 /** An abstract IPC service. IPC calls take a single {@link Writable} as a
26  * parameter, and return a {@link Writable} as their value. A service runs on
27  * a port and is defined by a parameter class and a value class.
28  *
29  * @author Doug Cutting
30  * @see Client
31  */

32 public abstract class Server {
33   public static final Logger JavaDoc LOG =
34     LogFormatter.getLogger("net.nutch.ipc.Server");
35
36   private int port; // port we listen on
37
private int handlerCount; // number of handler threads
38
private int maxQueuedCalls; // max number of queued calls
39
private Class JavaDoc paramClass; // class of call parameters
40

41   private int timeout = 10000; // timeout for i/o
42
private boolean running = true; // true while server runs
43
private LinkedList JavaDoc callQueue = new LinkedList JavaDoc(); // queued calls
44
private Object JavaDoc callDequeued = new Object JavaDoc(); // used by wait/notify
45

46   /** A call queued for handling. */
47   private static class Call {
48     private int id; // the client's call id
49
private Writable param; // the parameter passed
50
private Connection connection; // connection to client
51

52     public Call(int id, Writable param, Connection connection) {
53       this.id = id;
54       this.param = param;
55       this.connection = connection;
56     }
57   }
58
59   /** Listens on the socket, starting new connection threads. */
60   private class Listener extends Thread JavaDoc {
61     private ServerSocket JavaDoc socket;
62
63     public Listener() throws IOException JavaDoc {
64       this.socket = new ServerSocket JavaDoc(port);
65       socket.setSoTimeout(timeout);
66       this.setDaemon(true);
67       this.setName("Server listener on port " + port);
68     }
69
70     public void run() {
71       LOG.info(getName() + ": starting");
72       while (running) {
73         try {
74           new Connection(socket.accept()).start(); // start a new connection
75
} catch (SocketTimeoutException JavaDoc e) { // ignore timeouts
76
} catch (Exception JavaDoc e) { // log all other exceptions
77
LOG.log(Level.INFO, getName() + " caught: " + e, e);
78         }
79       }
80       try {
81         socket.close();
82       } catch (IOException JavaDoc e) {}
83       LOG.info(getName() + ": exiting");
84     }
85   }
86
87   /** Reads calls from a connection and queues them for handling. */
88   private class Connection extends Thread JavaDoc {
89     private Socket JavaDoc socket;
90     private DataInputStream JavaDoc in;
91     private DataOutputStream JavaDoc out;
92
93     public Connection(Socket JavaDoc socket) throws IOException JavaDoc {
94       this.socket = socket;
95       socket.setSoTimeout(timeout);
96       this.in = new DataInputStream JavaDoc
97         (new BufferedInputStream JavaDoc(socket.getInputStream()));
98       this.out = new DataOutputStream JavaDoc
99         (new BufferedOutputStream JavaDoc(socket.getOutputStream()));
100       this.setDaemon(true);
101       this.setName("Server connection on port " + port + " from "
102                    + socket.getInetAddress().getHostAddress());
103     }
104
105     public void run() {
106       LOG.info(getName() + ": starting");
107       try {
108         while (running) {
109           int id;
110           try {
111             id = in.readInt(); // try to read an id
112
} catch (SocketTimeoutException JavaDoc e) {
113             continue;
114           }
115         
116           if (LOG.isLoggable(Level.FINE))
117             LOG.fine(getName() + " got #" + id);
118         
119           Writable param = makeParam(); // read param
120
param.readFields(in);
121         
122           Call call = new Call(id, param, this);
123         
124           synchronized (callQueue) {
125             callQueue.addLast(call); // queue the call
126
callQueue.notify(); // wake up a waiting handler
127
}
128         
129           while (running && callQueue.size() >= maxQueuedCalls) {
130             synchronized (callDequeued) { // queue is full
131
callDequeued.wait(timeout); // wait for a dequeue
132
}
133           }
134         }
135       } catch (EOFException JavaDoc eof) {
136           // This is what happens when the other side shuts things down
137
} catch (Exception JavaDoc e) {
138         LOG.log(Level.INFO, getName() + " caught: " + e, e);
139       } finally {
140         try {
141           socket.close();
142         } catch (IOException JavaDoc e) {}
143         LOG.info(getName() + ": exiting");
144       }
145     }
146
147   }
148
149   /** Handles queued calls . */
150   private class Handler extends Thread JavaDoc {
151     public Handler() {
152       this.setDaemon(true);
153       this.setName("Server handler on " + port);
154     }
155
156     public void run() {
157       LOG.info(getName() + ": starting");
158       while (running) {
159         try {
160           Call call;
161           synchronized (callQueue) {
162             while (running && callQueue.size()==0) { // wait for a call
163
callQueue.wait(timeout);
164             }
165             if (!running) break;
166             call = (Call)callQueue.removeFirst(); // pop the queue
167
}
168
169           synchronized (callDequeued) { // tell others we've dequeued
170
callDequeued.notify();
171           }
172
173           if (LOG.isLoggable(Level.FINE))
174             LOG.fine(getName() + ": has #" + call.id + " from " +
175                      call.connection.socket.getInetAddress().getHostAddress());
176           
177           String JavaDoc error = null;
178           Writable value = null;
179           try {
180             value = call(call.param); // make the call
181
} catch (Exception JavaDoc e) {
182             LOG.log(Level.INFO, getName() + " call error: " + e, e);
183             error = e.toString();
184           }
185             
186           DataOutputStream JavaDoc out = call.connection.out;
187           synchronized (out) {
188             out.writeInt(call.id); // write call id
189
out.writeBoolean(error!=null); // write error flag
190
if (error != null)
191               value = new UTF8(error);
192             value.write(out); // write value
193
out.flush();
194           }
195
196         } catch (Exception JavaDoc e) {
197           LOG.log(Level.INFO, getName() + " caught: " + e, e);
198         }
199       }
200       LOG.info(getName() + ": exiting");
201     }
202   }
203   
204   /** Constructs a server listening on the named port. Parameters passed must
205    * be of the named class. The <code>handlerCount</handlerCount> determines
206    * the number of handler threads that will be used to process calls.
207    */

208   protected Server(int port, Class JavaDoc paramClass, int handlerCount) {
209     this.port = port;
210     this.paramClass = paramClass;
211     this.handlerCount = handlerCount;
212     this.maxQueuedCalls = handlerCount;
213   }
214
215   /** Sets the timeout used for network i/o. */
216   public void setTimeout(int timeout) { this.timeout = timeout; }
217
218   /** Starts the service. Must be called before any calls will be handled. */
219   public synchronized void start() throws IOException JavaDoc {
220     Listener listener = new Listener();
221     listener.start();
222     
223     for (int i = 0; i < handlerCount; i++) {
224       Handler handler = new Handler();
225       handler.start();
226     }
227   }
228
229   /** Stops the service. No calls will be handled after this is called. All
230    * threads will exit. */

231   public synchronized void stop() {
232     LOG.info("Stopping server on " + port);
233     running = false;
234     try {
235       Thread.sleep(timeout); // let all threads exit
236
} catch (InterruptedException JavaDoc e) {}
237     notify();
238   }
239
240   /** Wait for the server to be stopped. */
241   public synchronized void join() throws InterruptedException JavaDoc {
242     wait();
243   }
244
245   /** Called for each call. */
246   public abstract Writable call(Writable param) throws IOException JavaDoc;
247
248   
249   private Writable makeParam() {
250     Writable param; // construct param
251
try {
252       param = (Writable)paramClass.newInstance();
253     } catch (InstantiationException JavaDoc e) {
254       throw new RuntimeException JavaDoc(e.toString());
255     } catch (IllegalAccessException JavaDoc e) {
256       throw new RuntimeException JavaDoc(e.toString());
257     }
258     return param;
259   }
260
261 }
262
Popular Tags