1 23 24 package com.rift.coad.lib.httpd; 26 27 import java.util.Vector ; 29 30 import org.apache.log4j.Logger; 32 33 import com.rift.coad.lib.thread.BasicThread; 35 import com.rift.coad.lib.thread.CoadunationThreadGroup; 36 import com.rift.coad.lib.thread.ThreadStateMonitor; 37 import com.rift.coad.lib.security.ThreadsPermissionContainer; 38 import com.rift.coad.lib.configuration.Configuration; 39 import com.rift.coad.lib.configuration.ConfigurationFactory; 40 41 46 public class HttpRequestManager { 47 48 public class HttpRequestQueue { 50 private Vector requestQueue = new Vector (); 52 53 56 public HttpRequestQueue() { 57 58 } 59 60 61 68 public synchronized void pushRequest( 69 RequestInterface requestInterface) throws HttpdException { 70 processorPoolManager.notifyThread(); 71 requestQueue.add(requestInterface); 72 } 73 74 75 78 public synchronized RequestInterface popRequest() { 79 if (requestQueue.size() == 0) { 80 return null; 81 } 82 RequestInterface requestInterface = 83 (RequestInterface)requestQueue.get(0); 84 requestQueue.remove(0); 85 return requestInterface; 86 } 87 88 89 94 public synchronized int size() { 95 return requestQueue.size(); 96 } 97 } 98 99 100 104 public class HttpProcessor extends BasicThread { 105 106 private ThreadStateMonitor threadStateMonitor = 108 new ThreadStateMonitor(); 109 110 113 public HttpProcessor() throws Exception { 114 115 } 116 117 118 123 public void process() throws Exception { 124 log.debug("Process."); 125 while(threadStateMonitor.isTerminated() == false) { 126 log.debug("Wait for request."); 127 if (processorPoolManager.monitor() == false) { 128 break; 129 } 130 log.debug("Get a request."); 131 RequestInterface requestInterface = 132 requestQueue.popRequest(); 133 if (requestInterface != null) { 134 try { 135 log.info("Process a new http request."); 136 requestInterface.handleRequest(); 137 } catch (Exception ex) { 138 log.error("Failed to process an HTTP request [" + 139 ex.getMessage() + "].",ex); 140 } finally { 141 requestInterface.destroy(); 142 } 143 } 144 } 145 log.debug("Http Processor exiting."); 146 } 147 148 149 153 public void terminate() { 154 log.debug("Terminate called on Http Processor"); 155 threadStateMonitor.terminate(true); 156 } 157 } 158 159 160 163 public class ProcessorPoolManager { 164 165 private boolean running = true; 167 private CoadunationThreadGroup threadGroup = null; 168 private int min = 0; 169 private int max = 0; 170 private int currentSize = 0; 171 private int waitingThreads = 0; 172 private int requestCount = 0; 173 private String username = null; 174 175 176 184 public ProcessorPoolManager(CoadunationThreadGroup threadGroup,int min, int max, 185 String username) 186 throws HttpdException { 187 try { 188 this.threadGroup = threadGroup.createThreadGroup(); 189 this.min = min; 190 this.max = max; 191 this.username = username; 192 } catch (Exception ex) { 193 throw new HttpdException( 194 "Failed to instanciate the processor pool manager : " + 195 ex.getMessage(),ex); 196 } 197 } 198 199 200 203 public void shutdown() { 204 synchronized (this) { 205 running = false; 206 notifyAll(); 207 } 208 log.debug("Terminate the thread group"); 209 threadGroup.terminate(); 210 } 211 212 213 217 public synchronized void notifyThread() throws HttpdException { 218 if (running) { 219 requestCount++; 220 if (waitingThreads > 0) { 221 log.debug("Notify a waiting thread 2"); 222 notify(); 223 } else if (currentSize < max) { 224 try { 225 log.debug("Add a thread"); 226 HttpProcessor httpProcessor = new HttpProcessor(); 228 threadGroup.addThread(httpProcessor,username); 229 httpProcessor.start(); 230 currentSize++; 231 } catch (Exception ex) { 232 throw new HttpdException 233 ("Failed create a thread to process task : " + 234 ex.getMessage(),ex); 235 } 236 } 237 } else { 238 throw new HttpdException 239 ("Shut Down can handle not more requests"); 240 } 241 } 242 243 244 251 public synchronized boolean monitor() { 252 if (running == false) { 253 currentSize--; 254 return false; 255 } else if (requestCount > 0) { 256 requestCount --; 257 return true; 258 } else if (currentSize > min) { 259 currentSize--; 260 return false; 261 } 262 try { 263 waitingThreads++; 264 wait(); 265 waitingThreads--; 266 } catch (Exception ex) { 267 } 269 return running; 270 } 271 } 272 273 private final static String MIN_KEY = "pool_min"; 275 private final static long MIN_DEFAULT = 10; 276 private final static String MAX_KEY = "pool_max"; 277 private final static long MAX_DEFAULT = 20; 278 279 private Logger log = 281 Logger.getLogger(HttpRequestManager.class.getName()); 282 283 private Configuration configuration = null; 285 private HttpRequestQueue requestQueue = new HttpRequestQueue(); 286 private ProcessorPoolManager processorPoolManager = null; 287 288 289 296 public HttpRequestManager(CoadunationThreadGroup threadGroup) throws HttpdException { 297 try { 298 configuration = ConfigurationFactory.getInstance().getConfig( 299 this.getClass()); 300 processorPoolManager = new ProcessorPoolManager(threadGroup, 301 (int)configuration.getLong(MIN_KEY,MIN_DEFAULT), 302 (int)configuration.getLong(MAX_KEY,MAX_DEFAULT), 303 configuration.getString(HttpDaemon.USERNAME_KEY)); 304 } catch (Exception ex) { 305 throw new HttpdException( 306 "Failed to instanciate the http request manager : " + 307 ex.getMessage(),ex); 308 } 309 } 310 311 312 319 public void addRequest(RequestInterface requestInterface) 320 throws HttpdException { 321 requestQueue.pushRequest(requestInterface); 322 } 323 324 325 328 public void shutdown() { 329 processorPoolManager.shutdown(); 330 } 331 } 332 | Popular Tags |