1 package org.sapia.ubik.net.mplex; 2 3 import java.io.ByteArrayOutputStream ; 4 import java.io.IOException ; 5 import java.io.InputStream ; 6 import java.io.ObjectInputStream ; 7 import java.io.ObjectOutputStream ; 8 9 import java.net.Socket ; 10 11 12 24 public class MultiplexedServer { 25 public static long START_TIME = System.currentTimeMillis(); 26 private MultiplexServerSocket _server; 27 28 31 public MultiplexedServer() throws IOException { 32 int backlog = 100; 33 _server = new MultiplexServerSocket(7777, backlog); 34 log("Started multiplex server on port 7777"); 35 log("Setting backlog queue to " + backlog); 36 log("Setting " + _server.getAcceptorDaemonThread() + 37 " acceptor daemon threads"); 38 log("Setting " + _server.getSelectorDaemonThread() + 39 " selector daemon threads"); 40 41 MultiplexSocketConnector httpSocket = _server.createSocketConnector(new HttpStreamSelector( 42 null, null)); 43 HttpHandler httpHandler = new HttpHandler(httpSocket); 44 Thread httpThread = new Thread (httpHandler, 45 "HTTP-Processor"); 46 httpThread.start(); 47 48 MultiplexSocketConnector objectSocket = _server.createSocketConnector(new ObjectStreamSelector()); 49 ObjectHandler objectHandler = new ObjectHandler(objectSocket); 50 Thread objectThread = new Thread (objectHandler, 51 "Object-Processor"); 52 objectThread.start(); 53 } 54 55 public static void main(String [] args) { 56 try { 57 new MultiplexedServer().run(); 58 } catch (IOException ioe) { 59 ioe.printStackTrace(); 60 } 61 } 62 63 public static synchronized void log(String log) { 64 StringBuffer aBuffer = new StringBuffer (); 65 aBuffer.append(System.currentTimeMillis() - START_TIME).append(" [") 66 .append(Thread.currentThread().getName()).append("] ").append(log); 67 System.out.println(aBuffer.toString()); 68 } 69 70 public void run() { 71 log("Started default server socket handler..."); 72 73 int counter = 0; 74 75 try { 76 while (true) { 77 Socket client = _server.accept(); 79 new Thread (new ObjectServer(client), "ObjectServer-" + (++counter)).start(); 80 } 81 } catch (Exception e) { 82 e.printStackTrace(); 83 } finally { 84 try { 85 _server.close(); 86 } catch (IOException ioe) { 87 ioe.printStackTrace(); 88 } 89 } 90 } 91 92 96 public static class HttpHandler implements Runnable { 97 private MultiplexSocketConnector _socket; 98 99 public HttpHandler(MultiplexSocketConnector socket) { 100 _socket = socket; 101 } 102 103 public void run() { 104 log("Started HTTP server socket handler..."); 105 106 int counter = 0; 107 108 try { 109 while (true) { 110 Socket client = _socket.accept(); 112 new Thread (new HttpServer(client), "HttpServer-" + (++counter)).start(); 113 } 114 } catch (Exception e) { 115 e.printStackTrace(); 116 } finally { 117 try { 118 _socket.close(); 119 } catch (IOException ioe) { 120 ioe.printStackTrace(); 121 } 122 } 123 } 124 } 125 126 130 public static class HttpServer implements Runnable { 131 private Socket _client; 132 133 public HttpServer(Socket client) { 134 _client = client; 135 } 136 137 public void run() { 138 log("Starting HTTP server..."); 139 140 try { 141 InputStream is = _client.getInputStream(); 143 ByteArrayOutputStream request = new ByteArrayOutputStream (); 144 boolean isDone = false; 145 byte[] data = new byte[1024]; 146 147 while (!isDone) { 148 int length = is.read(data); 149 150 if (length >= 0) { 151 request.write(data, 0, length); 152 } 153 154 isDone = is.available() == 0; 155 } 156 157 StringBuffer aBuffer = new StringBuffer ("===> Got an HTTP request\n"); 159 String aPost = request.toString("UTF-8"); 160 aBuffer.append(aPost.substring(aPost.lastIndexOf("\r\n\r\n") + 4)); 161 log(aBuffer.toString()); 162 163 _client.getOutputStream().write("HTTP ACK".getBytes("UTF-8")); 165 _client.getOutputStream().flush(); 166 } catch (Exception e) { 167 e.printStackTrace(); 168 } finally { 169 try { 170 _client.close(); 171 } catch (IOException ioe) { 172 ioe.printStackTrace(); 173 } 174 } 175 } 176 } 177 178 182 public static class ObjectHandler implements Runnable { 183 private MultiplexSocketConnector _socket; 184 185 public ObjectHandler(MultiplexSocketConnector socket) { 186 _socket = socket; 187 } 188 189 public void run() { 190 log("Started Object server socket handler..."); 191 192 int counter = 0; 193 194 try { 195 while (true) { 196 Socket client = _socket.accept(); 198 new Thread (new ObjectServer(client), "ObjectServer-" + (++counter)).start(); 199 } 200 } catch (Exception e) { 201 e.printStackTrace(); 202 } finally { 203 try { 204 _socket.close(); 205 } catch (IOException ioe) { 206 ioe.printStackTrace(); 207 } 208 } 209 } 210 } 211 212 216 public static class ObjectServer implements Runnable { 217 private Socket _client; 218 219 public ObjectServer(Socket client) { 220 _client = client; 221 } 222 223 public void run() { 224 log("Started Object server..."); 225 226 int count = 0; 227 228 try { 229 ObjectInputStream request = null; 230 ObjectOutputStream response = null; 231 232 while (true) { 233 if (request == null) { 235 request = new ObjectInputStream (_client.getInputStream()); 236 } 237 238 StringBuffer aBuffer = new StringBuffer ( 239 "===> Got an Java Object request [" + (++count) + "]\n"); 240 aBuffer.append(request.readObject()); 241 log(aBuffer.toString()); 242 243 if (response == null) { 245 response = new ObjectOutputStream (_client.getOutputStream()); 246 } 247 248 response.writeObject("Java Object Ack"); 249 response.flush(); 250 } 251 } catch (Exception e) { 252 e.printStackTrace(); 253 } finally { 254 try { 255 _client.close(); 256 } catch (IOException ioe) { 257 ioe.printStackTrace(); 258 } 259 } 260 } 261 } 262 263 267 public static class DefaultServer implements Runnable { 268 private Socket _client; 269 270 public DefaultServer(Socket client) { 271 _client = client; 272 } 273 274 public void run() { 275 log("Started Default server..."); 276 277 int count = 0; 278 279 try { 280 while (true) { 281 InputStream is = _client.getInputStream(); 283 ByteArrayOutputStream request = new ByteArrayOutputStream (); 284 boolean isDone = false; 285 byte[] data = new byte[1024]; 286 287 while (!isDone) { 288 int length = is.read(data); 289 290 if (length >= 0) { 291 request.write(data, 0, length); 292 } 293 294 isDone = is.available() == 0; 295 } 296 297 StringBuffer aBuffer = new StringBuffer ( 298 "===> Got a default request [" + (++count) + "]\n"); 299 aBuffer.append(request); 300 log(aBuffer.toString()); 301 302 _client.getOutputStream().write("DEFAULT ACK".getBytes("UTF-8")); 304 _client.getOutputStream().flush(); 305 } 306 } catch (Exception e) { 307 e.printStackTrace(); 308 } finally { 309 try { 310 _client.close(); 311 } catch (IOException ioe) { 312 ioe.printStackTrace(); 313 } 314 } 315 } 316 } 317 } 318 | Popular Tags |