1 9 package org.jboss.remoting.transport.http; 10 11 import java.io.BufferedOutputStream ; 12 import java.io.ByteArrayOutputStream ; 13 import java.io.DataInputStream ; 14 import java.io.DataOutputStream ; 15 import java.io.IOException ; 16 import java.net.InetAddress ; 17 import java.net.ServerSocket ; 18 import java.net.Socket ; 19 import java.util.HashMap ; 20 import java.util.Map ; 21 import javax.management.MBeanServer ; 22 import javax.management.MBeanServerInvocationHandler ; 23 import javax.management.MalformedObjectNameException ; 24 import javax.management.ObjectName ; 25 import org.apache.commons.httpclient.Header; 26 import org.apache.commons.httpclient.HttpParser; 27 import org.jboss.remoting.InvocationRequest; 28 import org.jboss.remoting.InvokerLocator; 29 import org.jboss.remoting.marshal.MarshalFactory; 30 import org.jboss.remoting.marshal.Marshaller; 31 import org.jboss.remoting.marshal.UnMarshaller; 32 import org.jboss.remoting.marshal.http.HTTPMarshaller; 33 import org.jboss.remoting.marshal.http.HTTPUnMarshaller; 34 import org.jboss.remoting.marshal.serializable.SerializableMarshaller; 35 import org.jboss.remoting.transport.web.WebServerInvoker; 36 import org.jboss.util.threadpool.BasicThreadPool; 37 import org.jboss.util.threadpool.BlockingMode; 38 import org.jboss.util.threadpool.ThreadPool; 39 import org.jboss.util.threadpool.ThreadPoolMBean; 40 41 46 47 51 public class HTTPServerInvoker extends WebServerInvoker implements Runnable 52 { 53 public static final String MAX_NUM_HTTP_THREADS_KEY = "maxNumThreadsHTTP"; 54 public static final String HTTP_THREAD_POOL_CLASS_KEY = "HTTPThreadPool"; 55 56 private String httpThreadPoolClass = null; 57 58 private static int BACKLOG_DEFAULT = 1000; 59 private static int MAX_POOL_SIZE_DEFAULT = 100; 60 61 private ServerSocket serverSocket = null; 62 63 private boolean running = false; 64 65 private ThreadPool httpThreadPool; 66 private int maxPoolSize = MAX_POOL_SIZE_DEFAULT; 67 68 protected int backlog = BACKLOG_DEFAULT; 69 70 public static String HTML = "text/html"; 72 public static String PLAIN = "text/plain"; 73 public static String SOAP = "application/soap+xml"; 74 75 public HTTPServerInvoker(InvokerLocator locator) 76 { 77 super(locator); 78 } 79 80 public HTTPServerInvoker(InvokerLocator locator, Map configuration) 81 { 82 super(locator, configuration); 83 } 84 85 protected String getDefaultDataType() 86 { 87 return SerializableMarshaller.DATATYPE; 88 } 89 90 protected void setup() throws Exception 91 { 92 super.setup(); 93 94 Map config = getConfiguration(); 95 String maxNumOfThreads = (String ) config.get(MAX_NUM_HTTP_THREADS_KEY); 96 if(maxNumOfThreads != null && maxNumOfThreads.length() > 0) 97 { 98 try 99 { 100 maxPoolSize = Integer.parseInt(maxNumOfThreads); 101 } 102 catch(NumberFormatException e) 103 { 104 log.error("Can not convert max number of threads value (" + maxNumOfThreads + ") into a number."); 105 } 106 } 107 httpThreadPoolClass = (String ) config.get(HTTP_THREAD_POOL_CLASS_KEY); 108 109 } 110 111 public void setMaxNumberOfHTTPThreads(int numOfThreads) 112 { 113 this.maxPoolSize = numOfThreads; 114 } 115 116 public int getMaxNumberOfHTTPThreads() 117 { 118 return this.maxPoolSize; 119 } 120 121 public ThreadPool getHTTPThreadPool() 122 { 123 if(httpThreadPool == null) 124 { 125 if(httpThreadPoolClass == null || httpThreadPoolClass.length() == 0) 127 { 128 BasicThreadPool basicthreadpool = new BasicThreadPool("JBossRemoting - HTTP Server Invoker"); 129 basicthreadpool.setBlockingMode(BlockingMode.RUN); 130 basicthreadpool.setMaximumPoolSize(maxPoolSize); 131 httpThreadPool = basicthreadpool; 132 } 133 else 134 { 135 boolean isObjName = false; 137 try 138 { 139 ObjectName objName = new ObjectName (httpThreadPoolClass); 140 httpThreadPool = createThreadPoolProxy(objName); 141 isObjName = true; 142 } 143 catch(MalformedObjectNameException e) 144 { 145 log.debug("Thread pool class supplied is not an object name."); 146 } 147 148 if(!isObjName) 149 { 150 try 151 { 152 httpThreadPool = (ThreadPool) getClassLoader().loadClass(httpThreadPoolClass).newInstance(); 153 } 154 catch(Exception e) 155 { 156 throw new RuntimeException ("Error loading instance of ThreadPool based on class name: " + httpThreadPoolClass); 157 } 158 } 159 } 160 } 161 return httpThreadPool; 162 } 163 164 private ThreadPool createThreadPoolProxy(ObjectName objName) 165 { 166 ThreadPool pool; 167 MBeanServer server = getMBeanServer(); 168 if(server != null) 169 { 170 ThreadPoolMBean poolMBean = (ThreadPoolMBean) 171 MBeanServerInvocationHandler.newProxyInstance(server, 172 objName, 173 ThreadPoolMBean.class, 174 false); 175 pool = poolMBean.getInstance(); 176 } 177 else 178 { 179 throw new RuntimeException ("Can not register MBean ThreadPool as the ServerInvoker has not been registered with a MBeanServer."); 180 } 181 return pool; 182 } 183 184 185 public void setHTTPThreadPool(ThreadPool pool) 186 { 187 this.httpThreadPool = pool; 188 } 189 190 191 public void start() throws IOException 192 { 193 if(!running) 194 { 195 running = true; 196 197 try 198 { 199 ThreadPool httpThreadPool = getHTTPThreadPool(); 200 InetAddress bindAddress = InetAddress.getByName(getServerBindAddress()); 201 serverSocket = createServerSocket(getServerBindPort(), backlog, bindAddress); 202 203 for(int t = 0; t < maxPoolSize; t++) 205 { 206 httpThreadPool.run(this); 207 } 208 } 209 catch(IOException e) 210 { 211 log.error("Error starting ServerSocket. Bind port: " + getServerBindPort() + ", bind address: " + getServerBindAddress()); 212 throw e; 213 } 214 } 215 super.start(); 216 } 217 218 protected ServerSocket createServerSocket(int serverBindPort, int backlog, InetAddress bindAddress) throws IOException 219 { 220 return new ServerSocket (serverBindPort, backlog, bindAddress); 221 } 222 223 public void run() 224 { 225 try 226 { 227 Socket socket = serverSocket.accept(); 228 229 if(socket != null) 230 { 231 httpThreadPool.run(this); 232 processRequest(socket); 233 } 234 } 235 catch(Throwable thr) 236 { 237 if(running) 238 { 239 log.error("Error processing incoming request.", thr); 240 } 241 } 242 } 243 244 public void stop() 245 { 246 if(running) 247 { 248 running = false; 249 250 maxPoolSize = 0; 252 try 253 { 254 httpThreadPool.stop(false); 255 httpThreadPool.waitForTasks(2000); 256 } 257 catch(InterruptedException e) 258 { 259 e.printStackTrace(); 260 } 261 262 try 263 { 264 if(serverSocket != null && !serverSocket.isClosed()) 265 { 266 serverSocket.close(); 267 } 268 serverSocket = null; 269 } 270 catch(Exception e) 271 { 272 } 273 } 274 super.stop(); 275 276 log.debug("HTTPServerInvoker stopped."); 277 } 278 279 private void processRequest(Socket socket) 280 { 281 DataInputStream dataInput = null; 282 DataOutputStream dataOutput = null; 283 BufferedOutputStream bufferOutput = null; 284 try 285 { 286 Object response = null; 287 boolean isError = false; 288 String requestContentType = null; 289 try 290 { 291 dataInput = new DataInputStream (socket.getInputStream()); 292 293 295 300 ByteArrayOutputStream buffer = new ByteArrayOutputStream (); 301 int ch; 302 while((ch = dataInput.read()) >= 0) 303 { 304 buffer.write(ch); 305 if(ch == '\n') 306 { 307 break; 308 } 309 } 310 311 String methodType = null; 312 String path = null; 313 String httpVersion = null; 314 315 byte[] firstLineRaw = buffer.toByteArray(); 316 if(firstLineRaw[firstLineRaw.length - 2] == '\r') 318 { 319 String firstLine = new String (firstLineRaw).trim(); 321 int startIndex = 0; 322 int endIndex = firstLine.indexOf(' '); 323 methodType = firstLine.substring(startIndex, endIndex); 324 startIndex = endIndex + 1; 325 endIndex = firstLine.indexOf(' ', startIndex); 326 path = firstLine.substring(startIndex, endIndex); 327 startIndex = endIndex + 1; 328 httpVersion = firstLine.substring(startIndex); 329 } 330 else 331 { 332 log.error("Error processing first line. Should have ended in \r\n, but did not"); 333 throw new RuntimeException ("Error processing HTTP request type. First line of request is invalid."); 334 } 335 336 Map metadata = new HashMap (); 337 Header[] headers = HttpParser.parseHeaders(dataInput); 338 for(int x = 0; x < headers.length; x++) 339 { 340 String headerName = headers[x].getName(); 341 String headerValue = headers[x].getValue(); 342 metadata.put(headerName, headerValue); 343 if("Content-Type".equalsIgnoreCase(headerName)) 345 { 346 requestContentType = headers[x].getValue(); 347 } 348 } 349 350 UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(HTTPUnMarshaller.DATATYPE); 351 Object obj = unmarshaller.read(dataInput, metadata); 352 353 InvocationRequest request = null; 354 355 if(obj instanceof InvocationRequest) 356 { 357 request = (InvocationRequest) obj; 358 } 359 else 360 { 361 if(isBinary(requestContentType)) 362 { 363 request = getInvocationRequest(metadata, obj); 364 } 365 else 366 { 367 request = createNewInvocationRequest(metadata, obj); 368 } 369 } 370 371 try 372 { 373 response = invoke(request); 375 } 376 catch(Throwable ex) 377 { 378 log.debug("Error thrown calling invoke on server invoker.", ex); 379 response = ex; 380 isError = true; 381 } 382 383 dataOutput = new DataOutputStream (socket.getOutputStream()); 384 bufferOutput = new BufferedOutputStream (dataOutput, 1024); 385 } 386 catch(Throwable thr) 387 { 388 log.debug("Error thrown processing request. Probably error with processing headers.", thr); 389 if(thr instanceof Exception ) 390 { 391 response = (Exception ) thr; 392 } 393 else 394 { 395 response = new Exception (thr); 396 } 397 isError = true; 398 } 399 if(response == null) 400 { 401 bufferOutput.write("HTTP/1.1 ".getBytes()); 403 bufferOutput.write("200 JBoss Remoting: Request Processed Successfully".getBytes()); 404 String contentType = "\r\n" + "Content-Type" + ": " + "text/html"; 405 bufferOutput.write(contentType.getBytes()); 406 String contentLength = "\r\n" + "Content-Length" + ": " + 0; 407 bufferOutput.write(contentLength.getBytes()); 408 bufferOutput.flush(); 411 } 412 else 413 { 414 bufferOutput.write("HTTP/1.1 ".getBytes()); 416 String status = isError ? "500 JBoss Remoting: Error occurred within target application." : 417 "200 JBoss Remoting: Request Processed Succussfully"; 418 bufferOutput.write(status.getBytes()); 419 420 String contentType = "\r\n" + "Content-Type" + ": " + requestContentType; 422 bufferOutput.write(contentType.getBytes()); 423 int iContentLength = getContentLength(response); 424 String contentLength = "\r\n" + "Content-Length" + ": " + iContentLength; 425 bufferOutput.write(contentLength.getBytes()); 426 bufferOutput.write(("\r\n" + "\r\n").getBytes()); 428 429 Marshaller marshaller = MarshalFactory.getMarshaller(getLocator(), this.getClass().getClassLoader()); 432 if(marshaller == null) 433 { 434 marshaller = MarshalFactory.getMarshaller(HTTPMarshaller.DATATYPE); 435 } 436 marshaller.write(response, bufferOutput); 437 438 } 439 } 440 catch(Exception e) 441 { 442 log.error("Error processing client request.", e); 443 } 444 finally 445 { 446 if(dataInput != null) 447 { 448 try 449 { 450 dataInput.close(); 451 } 452 catch(Exception e) 453 { 454 log.warn("Error closing resource.", e); 455 } 456 } 457 if(dataOutput != null) 458 { 459 try 460 { 461 dataOutput.close(); 462 } 463 catch(Exception e) 464 { 465 log.warn("Error closing resource.", e); 466 } 467 } 468 try 469 { 470 socket.close(); 471 } 472 catch(Exception e) 473 { 474 log.warn("Error closing resource.", e); 475 } 476 } 477 } 478 } 479 | Popular Tags |