1 2 3 4 package net.nutch.ipc; 5 6 import java.io.IOException ; 7 import java.io.EOFException ; 8 import java.io.DataInputStream ; 9 import java.io.DataOutputStream ; 10 import java.io.BufferedInputStream ; 11 import java.io.BufferedOutputStream ; 12 13 import java.net.Socket ; 14 import java.net.ServerSocket ; 15 import java.net.SocketTimeoutException ; 16 17 import java.util.LinkedList ; 18 import java.util.logging.Logger ; 19 import java.util.logging.Level ; 20 21 import net.nutch.util.LogFormatter; 22 import net.nutch.io.Writable; 23 import net.nutch.io.UTF8; 24 25 32 public abstract class Server { 33 public static final Logger LOG = 34 LogFormatter.getLogger("net.nutch.ipc.Server"); 35 36 private int port; private int handlerCount; private int maxQueuedCalls; private Class paramClass; 41 private int timeout = 10000; private boolean running = true; private LinkedList callQueue = new LinkedList (); private Object callDequeued = new Object (); 46 47 private static class Call { 48 private int id; private Writable param; private Connection connection; 52 public Call(int id, Writable param, Connection connection) { 53 this.id = id; 54 this.param = param; 55 this.connection = connection; 56 } 57 } 58 59 60 private class Listener extends Thread { 61 private ServerSocket socket; 62 63 public Listener() throws IOException { 64 this.socket = new ServerSocket (port); 65 socket.setSoTimeout(timeout); 66 this.setDaemon(true); 67 this.setName("Server listener on port " + port); 68 } 69 70 public void run() { 71 LOG.info(getName() + ": starting"); 72 while (running) { 73 try { 74 new Connection(socket.accept()).start(); } catch (SocketTimeoutException e) { } catch (Exception e) { LOG.log(Level.INFO, getName() + " caught: " + e, e); 78 } 79 } 80 try { 81 socket.close(); 82 } catch (IOException e) {} 83 LOG.info(getName() + ": exiting"); 84 } 85 } 86 87 88 private class Connection extends Thread { 89 private Socket socket; 90 private DataInputStream in; 91 private DataOutputStream out; 92 93 public Connection(Socket socket) throws IOException { 94 this.socket = socket; 95 socket.setSoTimeout(timeout); 96 this.in = new DataInputStream 97 (new BufferedInputStream (socket.getInputStream())); 98 this.out = new DataOutputStream 99 (new BufferedOutputStream (socket.getOutputStream())); 100 this.setDaemon(true); 101 this.setName("Server connection on port " + port + " from " 102 + socket.getInetAddress().getHostAddress()); 103 } 104 105 public void run() { 106 LOG.info(getName() + ": starting"); 107 try { 108 while (running) { 109 int id; 110 try { 111 id = in.readInt(); } catch (SocketTimeoutException e) { 113 continue; 114 } 115 116 if (LOG.isLoggable(Level.FINE)) 117 LOG.fine(getName() + " got #" + id); 118 119 Writable param = makeParam(); param.readFields(in); 121 122 Call call = new Call(id, param, this); 123 124 synchronized (callQueue) { 125 callQueue.addLast(call); callQueue.notify(); } 128 129 while (running && callQueue.size() >= maxQueuedCalls) { 130 synchronized (callDequeued) { callDequeued.wait(timeout); } 133 } 134 } 135 } catch (EOFException eof) { 136 } catch (Exception e) { 138 LOG.log(Level.INFO, getName() + " caught: " + e, e); 139 } finally { 140 try { 141 socket.close(); 142 } catch (IOException e) {} 143 LOG.info(getName() + ": exiting"); 144 } 145 } 146 147 } 148 149 150 private class Handler extends Thread { 151 public Handler() { 152 this.setDaemon(true); 153 this.setName("Server handler on " + port); 154 } 155 156 public void run() { 157 LOG.info(getName() + ": starting"); 158 while (running) { 159 try { 160 Call call; 161 synchronized (callQueue) { 162 while (running && callQueue.size()==0) { callQueue.wait(timeout); 164 } 165 if (!running) break; 166 call = (Call)callQueue.removeFirst(); } 168 169 synchronized (callDequeued) { callDequeued.notify(); 171 } 172 173 if (LOG.isLoggable(Level.FINE)) 174 LOG.fine(getName() + ": has #" + call.id + " from " + 175 call.connection.socket.getInetAddress().getHostAddress()); 176 177 String error = null; 178 Writable value = null; 179 try { 180 value = call(call.param); } catch (Exception e) { 182 LOG.log(Level.INFO, getName() + " call error: " + e, e); 183 error = e.toString(); 184 } 185 186 DataOutputStream out = call.connection.out; 187 synchronized (out) { 188 out.writeInt(call.id); out.writeBoolean(error!=null); if (error != null) 191 value = new UTF8(error); 192 value.write(out); out.flush(); 194 } 195 196 } catch (Exception e) { 197 LOG.log(Level.INFO, getName() + " caught: " + e, e); 198 } 199 } 200 LOG.info(getName() + ": exiting"); 201 } 202 } 203 204 208 protected Server(int port, Class paramClass, int handlerCount) { 209 this.port = port; 210 this.paramClass = paramClass; 211 this.handlerCount = handlerCount; 212 this.maxQueuedCalls = handlerCount; 213 } 214 215 216 public void setTimeout(int timeout) { this.timeout = timeout; } 217 218 219 public synchronized void start() throws IOException { 220 Listener listener = new Listener(); 221 listener.start(); 222 223 for (int i = 0; i < handlerCount; i++) { 224 Handler handler = new Handler(); 225 handler.start(); 226 } 227 } 228 229 231 public synchronized void stop() { 232 LOG.info("Stopping server on " + port); 233 running = false; 234 try { 235 Thread.sleep(timeout); } catch (InterruptedException e) {} 237 notify(); 238 } 239 240 241 public synchronized void join() throws InterruptedException { 242 wait(); 243 } 244 245 246 public abstract Writable call(Writable param) throws IOException ; 247 248 249 private Writable makeParam() { 250 Writable param; try { 252 param = (Writable)paramClass.newInstance(); 253 } catch (InstantiationException e) { 254 throw new RuntimeException (e.toString()); 255 } catch (IllegalAccessException e) { 256 throw new RuntimeException (e.toString()); 257 } 258 return param; 259 } 260 261 } 262 | Popular Tags |