1 8 package org.codehaus.aspectwerkz.connectivity; 9 10 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 11 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 12 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 13 import org.codehaus.aspectwerkz.exception.WrappedRuntimeException; 14 15 import java.io.FileInputStream ; 16 import java.io.IOException ; 17 import java.net.InetAddress ; 18 import java.net.ServerSocket ; 19 import java.net.Socket ; 20 import java.util.Properties ; 21 22 29 public class RemoteProxyServer implements Runnable { 30 private static String HOST_NAME; 31 32 private static int PORT; 33 34 private static boolean BOUNDED_THREAD_POOL; 35 36 private static boolean LISTENER_THREAD_RUN_AS_DAEMON; 37 38 private static int BACKLOG; 39 40 private static int NUM_LISTENER_THREADS; 41 42 private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY; 43 44 private static int CLIENT_THREAD_TIMEOUT; 45 46 private static int THREAD_POOL_MAX_SIZE; 47 48 private static int THREAD_POOL_MIN_SIZE; 49 50 private static int THREAD_POOL_INIT_SIZE; 51 52 private static int THREAD_POOL_KEEP_ALIVE_TIME; 53 54 private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED; 55 56 59 static { 60 Properties properties = new Properties (); 61 try { 62 properties.load(new FileInputStream (System.getProperty("aspectwerkz.resource.bundle"))); 63 } catch (Exception e) { 64 System.out.println("no aspectwerkz resource bundle found on classpath, using defaults"); 65 66 } 68 String property = properties.getProperty("remote.server.hostname"); 69 if (property == null) { 70 HOST_NAME = property; 71 } else { 72 HOST_NAME = property; 73 } 74 property = properties.getProperty("remote.server.port"); 75 if (property == null) { 76 PORT = 7777; 77 } else { 78 PORT = Integer.parseInt(property); 79 } 80 property = properties.getProperty("remote.server.listener.threads.backlog"); 81 if (property == null) { 82 BACKLOG = 200; 83 } else { 84 BACKLOG = Integer.parseInt(property); 85 } 86 property = properties.getProperty("remote.server.listener.threads.nr"); 87 if (property == null) { 88 NUM_LISTENER_THREADS = 10; 89 } else { 90 NUM_LISTENER_THREADS = Integer.parseInt(property); 91 } 92 property = properties.getProperty("remote.server.client.threads.timeout"); 93 if (property == null) { 94 CLIENT_THREAD_TIMEOUT = 60000; 95 } else { 96 CLIENT_THREAD_TIMEOUT = Integer.parseInt(property); 97 } 98 property = properties.getProperty("remote.server.thread.pool.max.size"); 99 if (property == null) { 100 THREAD_POOL_MAX_SIZE = 100; 101 } else { 102 THREAD_POOL_MAX_SIZE = Integer.parseInt(property); 103 } 104 property = properties.getProperty("remote.server.thread.pool.min.size"); 105 if (property == null) { 106 THREAD_POOL_MIN_SIZE = 10; 107 } else { 108 THREAD_POOL_MIN_SIZE = Integer.parseInt(property); 109 } 110 property = properties.getProperty("remote.server.thread.pool.init.size"); 111 if (property == null) { 112 THREAD_POOL_INIT_SIZE = 10; 113 } else { 114 THREAD_POOL_INIT_SIZE = Integer.parseInt(property); 115 } 116 property = properties.getProperty("remote.server.thread.pool.keep.alive.time"); 117 if (property == null) { 118 THREAD_POOL_KEEP_ALIVE_TIME = 300000; 119 } else { 120 THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property); 121 } 122 property = properties.getProperty("remote.server.thread.pool.type"); 123 if ((property != null) && property.equals("dynamic")) { 124 BOUNDED_THREAD_POOL = false; 125 } else { 126 BOUNDED_THREAD_POOL = true; 127 } 128 property = properties.getProperty("remote.server.listener.threads.run.as.daemon"); 129 if ((property != null) && property.equals("true")) { 130 LISTENER_THREAD_RUN_AS_DAEMON = true; 131 } else { 132 LISTENER_THREAD_RUN_AS_DAEMON = false; 133 } 134 property = properties.getProperty("remote.server.thread.pool.wait.when.blocked"); 135 if ((property != null) && property.equals("true")) { 136 THREAD_POOL_WAIT_WHEN_BLOCKED = true; 137 } else { 138 THREAD_POOL_WAIT_WHEN_BLOCKED = false; 139 } 140 } 141 142 145 private ServerSocket m_serverSocket = null; 146 147 150 private Thread [] m_listenerThreads = null; 151 152 155 private PooledExecutor m_threadPool = null; 156 157 160 private ClassLoader m_loader = null; 161 162 165 private Invoker m_invoker = null; 166 167 170 private boolean m_running = true; 171 172 178 public RemoteProxyServer(final ClassLoader loader, final Invoker invoker) { 179 m_invoker = invoker; 180 m_loader = loader; 181 } 182 183 186 public void start() { 187 m_running = true; 188 try { 189 InetAddress bindAddress = InetAddress.getByName(HOST_NAME); 190 m_serverSocket = new ServerSocket (PORT, BACKLOG, bindAddress); 191 if (BOUNDED_THREAD_POOL) { 192 createBoundedThreadPool( 193 THREAD_POOL_MAX_SIZE, 194 THREAD_POOL_MIN_SIZE, 195 THREAD_POOL_INIT_SIZE, 196 THREAD_POOL_KEEP_ALIVE_TIME, 197 THREAD_POOL_WAIT_WHEN_BLOCKED 198 ); 199 } else { 200 createDynamicThreadPool(THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE, THREAD_POOL_KEEP_ALIVE_TIME); 201 } 202 m_listenerThreads = new Thread [NUM_LISTENER_THREADS]; 203 for (int i = 0; i < NUM_LISTENER_THREADS; i++) { 204 m_listenerThreads[i] = new Thread (this); 205 m_listenerThreads[i].setName("AspectWerkz::Listener " + (i + 1)); 206 m_listenerThreads[i].setDaemon(LISTENER_THREAD_RUN_AS_DAEMON); 207 m_listenerThreads[i].setPriority(LISTENER_THREAD_PRIORITY); 208 m_listenerThreads[i].start(); 209 } 210 } catch (IOException e) { 211 throw new WrappedRuntimeException(e); 212 } 213 } 214 215 218 public void stop() { 219 m_running = false; 220 for (int i = 0; i < NUM_LISTENER_THREADS; i++) { 221 m_listenerThreads[i].interrupt(); 222 } 223 m_threadPool.shutdownNow(); 224 } 225 226 230 public void run() { 231 try { 232 while (m_running) { 233 final Socket clientSocket = m_serverSocket.accept(); 234 synchronized (m_threadPool) { 235 m_threadPool.execute( 236 new RemoteProxyServerThread( 237 clientSocket, 238 m_loader, 239 m_invoker, 240 CLIENT_THREAD_TIMEOUT 241 ) 242 ); 243 } 244 } 245 m_serverSocket.close(); 246 } catch (Exception e) { 247 throw new WrappedRuntimeException(e); 248 } 249 } 250 251 260 private void createBoundedThreadPool(final int threadPoolMaxSize, 261 final int threadPoolMinSize, 262 final int threadPoolInitSize, 263 final int keepAliveTime, 264 final boolean waitWhenBlocked) { 265 m_threadPool = new PooledExecutor(new BoundedBuffer(threadPoolInitSize), threadPoolMaxSize); 266 m_threadPool.setKeepAliveTime(keepAliveTime); 267 m_threadPool.createThreads(threadPoolInitSize); 268 m_threadPool.setMinimumPoolSize(threadPoolMinSize); 269 if (waitWhenBlocked) { 270 m_threadPool.waitWhenBlocked(); 271 } 272 } 273 274 281 private void createDynamicThreadPool(final int threadPoolMinSize, 282 final int threadPoolInitSize, 283 final int keepAliveTime) { 284 m_threadPool = new PooledExecutor(new LinkedQueue()); 285 m_threadPool.setKeepAliveTime(keepAliveTime); 286 m_threadPool.createThreads(threadPoolInitSize); 287 m_threadPool.setMinimumPoolSize(threadPoolMinSize); 288 } 289 } | Popular Tags |