1 4 package com.tc.aspectwerkz.connectivity; 5 6 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 8 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 9 10 import com.tc.aspectwerkz.exception.WrappedRuntimeException; 11 12 import java.io.FileInputStream ; 13 import java.io.IOException ; 14 import java.net.InetAddress ; 15 import java.net.ServerSocket ; 16 import java.net.Socket ; 17 import java.util.Properties ; 18 19 26 public class RemoteProxyServer implements Runnable { 27 private static String HOST_NAME; 28 29 private static int PORT; 30 31 private static boolean BOUNDED_THREAD_POOL; 32 33 private static boolean LISTENER_THREAD_RUN_AS_DAEMON; 34 35 private static int BACKLOG; 36 37 private static int NUM_LISTENER_THREADS; 38 39 private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY; 40 41 private static int CLIENT_THREAD_TIMEOUT; 42 43 private static int THREAD_POOL_MAX_SIZE; 44 45 private static int THREAD_POOL_MIN_SIZE; 46 47 private static int THREAD_POOL_INIT_SIZE; 48 49 private static int THREAD_POOL_KEEP_ALIVE_TIME; 50 51 private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED; 52 53 56 static { 57 Properties properties = new Properties (); 58 try { 59 properties.load(new FileInputStream (System.getProperty("aspectwerkz.resource.bundle"))); 60 } catch (Exception e) { 61 System.out.println("no aspectwerkz resource bundle found on classpath, using defaults"); 62 63 } 65 String property = properties.getProperty("remote.server.hostname"); 66 if (property == null) { 67 HOST_NAME = property; 68 } else { 69 HOST_NAME = property; 70 } 71 property = properties.getProperty("remote.server.port"); 72 if (property == null) { 73 PORT = 7777; 74 } else { 75 PORT = Integer.parseInt(property); 76 } 77 property = properties.getProperty("remote.server.listener.threads.backlog"); 78 if (property == null) { 79 BACKLOG = 200; 80 } else { 81 BACKLOG = Integer.parseInt(property); 82 } 83 property = properties.getProperty("remote.server.listener.threads.nr"); 84 if (property == null) { 85 NUM_LISTENER_THREADS = 10; 86 } else { 87 NUM_LISTENER_THREADS = Integer.parseInt(property); 88 } 89 property = properties.getProperty("remote.server.client.threads.timeout"); 90 if (property == null) { 91 CLIENT_THREAD_TIMEOUT = 60000; 92 } else { 93 CLIENT_THREAD_TIMEOUT = Integer.parseInt(property); 94 } 95 property = properties.getProperty("remote.server.thread.pool.max.size"); 96 if (property == null) { 97 THREAD_POOL_MAX_SIZE = 100; 98 } else { 99 THREAD_POOL_MAX_SIZE = Integer.parseInt(property); 100 } 101 property = properties.getProperty("remote.server.thread.pool.min.size"); 102 if (property == null) { 103 THREAD_POOL_MIN_SIZE = 10; 104 } else { 105 THREAD_POOL_MIN_SIZE = Integer.parseInt(property); 106 } 107 property = properties.getProperty("remote.server.thread.pool.init.size"); 108 if (property == null) { 109 THREAD_POOL_INIT_SIZE = 10; 110 } else { 111 THREAD_POOL_INIT_SIZE = Integer.parseInt(property); 112 } 113 property = properties.getProperty("remote.server.thread.pool.keep.alive.time"); 114 if (property == null) { 115 THREAD_POOL_KEEP_ALIVE_TIME = 300000; 116 } else { 117 THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property); 118 } 119 property = properties.getProperty("remote.server.thread.pool.type"); 120 if ((property != null) && property.equals("dynamic")) { 121 BOUNDED_THREAD_POOL = false; 122 } else { 123 BOUNDED_THREAD_POOL = true; 124 } 125 property = properties.getProperty("remote.server.listener.threads.run.as.daemon"); 126 if ((property != null) && property.equals("true")) { 127 LISTENER_THREAD_RUN_AS_DAEMON = true; 128 } else { 129 LISTENER_THREAD_RUN_AS_DAEMON = false; 130 } 131 property = properties.getProperty("remote.server.thread.pool.wait.when.blocked"); 132 if ((property != null) && property.equals("true")) { 133 THREAD_POOL_WAIT_WHEN_BLOCKED = true; 134 } else { 135 THREAD_POOL_WAIT_WHEN_BLOCKED = false; 136 } 137 } 138 139 142 private ServerSocket m_serverSocket = null; 143 144 147 private Thread [] m_listenerThreads = null; 148 149 152 private PooledExecutor m_threadPool = null; 153 154 157 private ClassLoader m_loader = null; 158 159 162 private Invoker m_invoker = null; 163 164 167 private boolean m_running = true; 168 169 175 public RemoteProxyServer(final ClassLoader loader, final Invoker invoker) { 176 m_invoker = invoker; 177 m_loader = loader; 178 } 179 180 183 public void start() { 184 m_running = true; 185 try { 186 InetAddress bindAddress = InetAddress.getByName(HOST_NAME); 187 m_serverSocket = new ServerSocket (PORT, BACKLOG, bindAddress); 188 if (BOUNDED_THREAD_POOL) { 189 createBoundedThreadPool( 190 THREAD_POOL_MAX_SIZE, 191 THREAD_POOL_MIN_SIZE, 192 THREAD_POOL_INIT_SIZE, 193 THREAD_POOL_KEEP_ALIVE_TIME, 194 THREAD_POOL_WAIT_WHEN_BLOCKED 195 ); 196 } else { 197 createDynamicThreadPool(THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE, THREAD_POOL_KEEP_ALIVE_TIME); 198 } 199 m_listenerThreads = new Thread [NUM_LISTENER_THREADS]; 200 for (int i = 0; i < NUM_LISTENER_THREADS; i++) { 201 m_listenerThreads[i] = new Thread (this); 202 m_listenerThreads[i].setName("AspectWerkz::Listener " + (i + 1)); 203 m_listenerThreads[i].setDaemon(LISTENER_THREAD_RUN_AS_DAEMON); 204 m_listenerThreads[i].setPriority(LISTENER_THREAD_PRIORITY); 205 m_listenerThreads[i].start(); 206 } 207 } catch (IOException e) { 208 throw new WrappedRuntimeException(e); 209 } 210 } 211 212 215 public void stop() { 216 m_running = false; 217 for (int i = 0; i < NUM_LISTENER_THREADS; i++) { 218 m_listenerThreads[i].interrupt(); 219 } 220 m_threadPool.shutdownNow(); 221 } 222 223 227 public void run() { 228 try { 229 while (m_running) { 230 final Socket clientSocket = m_serverSocket.accept(); 231 synchronized (m_threadPool) { 232 m_threadPool.execute( 233 new RemoteProxyServerThread( 234 clientSocket, 235 m_loader, 236 m_invoker, 237 CLIENT_THREAD_TIMEOUT 238 ) 239 ); 240 } 241 } 242 m_serverSocket.close(); 243 } catch (Exception e) { 244 throw new WrappedRuntimeException(e); 245 } 246 } 247 248 257 private void createBoundedThreadPool(final int threadPoolMaxSize, 258 final int threadPoolMinSize, 259 final int threadPoolInitSize, 260 final int keepAliveTime, 261 final boolean waitWhenBlocked) { 262 m_threadPool = new PooledExecutor(new BoundedBuffer(threadPoolInitSize), threadPoolMaxSize); 263 m_threadPool.setKeepAliveTime(keepAliveTime); 264 m_threadPool.createThreads(threadPoolInitSize); 265 m_threadPool.setMinimumPoolSize(threadPoolMinSize); 266 if (waitWhenBlocked) { 267 m_threadPool.waitWhenBlocked(); 268 } 269 } 270 271 278 private void createDynamicThreadPool(final int threadPoolMinSize, 279 final int threadPoolInitSize, 280 final int keepAliveTime) { 281 m_threadPool = new PooledExecutor(new LinkedQueue()); 282 m_threadPool.setKeepAliveTime(keepAliveTime); 283 m_threadPool.createThreads(threadPoolInitSize); 284 m_threadPool.setMinimumPoolSize(threadPoolMinSize); 285 } 286 } | Popular Tags |