1 9 package org.jboss.remoting.transport.socket; 10 11 import java.io.IOException ; 12 import java.net.InetAddress ; 13 import java.net.ServerSocket ; 14 import java.net.Socket ; 15 import java.util.Iterator ; 16 import java.util.LinkedList ; 17 import java.util.Map ; 18 import java.util.Properties ; 19 import java.util.Set ; 20 import org.jboss.remoting.InvokerLocator; 21 import org.jboss.remoting.ServerInvoker; 22 import org.jboss.remoting.marshal.serializable.SerializableMarshaller; 23 import org.jboss.util.propertyeditor.PropertyEditors; 24 25 33 public class SocketServerInvoker extends ServerInvoker implements Runnable , SocketServerInvokerMBean 34 { 35 private InetAddress addr; 36 private int port; 37 static int clientCount = 0; 38 39 private Properties props = new Properties (); 40 41 private static int BACKLOG_DEFAULT = 200; 42 private static int MAX_POOL_SIZE_DEFAULT = 300; 43 44 47 public static final String SERVER_SOCKET_CLASS_FLAG = "serverSocketClass"; 48 private String serverSocketClass = ServerSocketWrapper.class.getName(); 49 50 protected ServerSocket serverSocket = null; 51 protected boolean running = false; 52 protected int backlog = BACKLOG_DEFAULT; 53 protected Thread [] acceptThreads; 54 protected int numAcceptThreads = 1; 55 protected int maxPoolSize = MAX_POOL_SIZE_DEFAULT; 56 protected LRUPool clientpool; 57 protected LinkedList threadpool; 58 protected int timeout = 60000; 60 63 protected boolean trace = false; 64 65 66 public SocketServerInvoker(InvokerLocator locator) 67 { 68 super(locator); 69 } 70 71 public SocketServerInvoker(InvokerLocator locator, Map configuration) 72 { 73 super(locator, configuration); 74 } 75 76 public InetAddress getAddress() 77 { 78 return addr; 79 } 80 81 public int getPort() 82 { 83 return port; 84 } 85 86 public Properties getProperties() 87 { 88 return props; 89 } 90 91 protected void setup() 92 throws Exception 93 { 94 props.putAll(getConfiguration()); 95 PropertyEditors.mapJavaBeanProperties(this, props, false); 96 97 super.setup(); 98 99 String ssclass = props.getProperty(SERVER_SOCKET_CLASS_FLAG); 100 if(ssclass != null) 101 { 102 serverSocketClass = ssclass; 103 } 104 105 } 106 107 protected void finalize() throws Throwable 108 { 109 stop(); 110 super.finalize(); 111 } 112 113 119 public synchronized void start() throws IOException 120 { 121 122 trace = log.isTraceEnabled(); 123 124 if(!running) 125 { 126 running = true; 127 128 InetAddress bindAddress = InetAddress.getByName(getServerBindAddress()); 129 130 if(maxPoolSize <= 0) 131 { 132 maxPoolSize = MAX_POOL_SIZE_DEFAULT; 134 } 135 clientpool = new LRUPool(2, maxPoolSize); 136 clientpool.create(); 137 threadpool = new LinkedList (); 138 try 139 { 140 serverSocket = createServerSocket(getServerBindPort(), backlog, bindAddress); 141 } 142 catch(IOException e) 143 { 144 log.error("Error starting ServerSocket. Bind port: " + getServerBindPort() + ", bind address: " + bindAddress); 145 throw e; 146 } 147 148 acceptThreads = new Thread [numAcceptThreads]; 149 for(int i = 0; i < numAcceptThreads; i++) 150 { 151 String name = "SocketServerInvoker#" + i + "-" + getServerBindPort(); 152 acceptThreads[i] = new Thread (this, name); 153 acceptThreads[i].start(); 154 } 155 } 156 super.start(); 157 } 158 159 protected ServerSocket createServerSocket(int serverBindPort, int backlog, InetAddress bindAddress) throws IOException 160 { 161 ServerSocket svrSocket = new ServerSocket (serverBindPort, backlog, bindAddress); 162 log.debug("Created server socket: " + svrSocket); 163 return svrSocket; 164 } 165 166 public void destroy() 167 { 168 clientpool.destroy(); 169 } 170 171 177 public synchronized void stop() 178 { 179 180 if(running) 181 { 182 running = false; 183 184 maxPoolSize = 0; for(int i = 0; i < acceptThreads.length; i++) 186 { 187 try 188 { 189 acceptThreads[i].interrupt(); 190 } 191 catch(Exception ignored) 192 { 193 } 194 } 195 Set svrThreads = clientpool.getContents(); 196 Iterator itr = svrThreads.iterator(); 197 while(itr.hasNext()) 198 { 199 Object o = itr.next(); 200 ServerThread st = (ServerThread) o; 201 st.shutdown(); 202 } 203 clientpool.flush(); 204 clientpool.stop(); 205 for(int i = 0; i < threadpool.size(); i++) 206 { 207 ServerThread thread = (ServerThread) threadpool.removeFirst(); 208 thread.shutdown(); 209 } 210 211 try 212 { 213 serverSocket.close(); 214 } 215 catch(Exception e) 216 { 217 } 218 } 219 super.stop(); 220 } 221 222 228 public int getSocketTimeout() 229 { 230 return timeout; 231 } 232 233 239 public void setSocketTimeout(int time) 240 { 241 this.timeout = time; 242 } 243 244 248 public int getCurrentThreadPoolSize() 249 { 250 return threadpool.size(); 251 } 252 253 257 public int getCurrentClientPoolSize() 258 { 259 return clientpool.size(); 260 } 261 262 268 public int getNumAcceptThreads() 269 { 270 return numAcceptThreads; 271 } 272 273 279 public void setNumAcceptThreads(int size) 280 { 281 this.numAcceptThreads = size; 282 } 283 284 291 public int getMaxPoolSize() 292 { 293 return maxPoolSize; 294 } 295 296 302 public void setMaxPoolSize(int maxPoolSize) 303 { 304 this.maxPoolSize = maxPoolSize; 305 } 306 307 310 public int getBacklog() 311 { 312 return backlog; 313 } 314 315 318 public void setBacklog(int backlog) 319 { 320 if(backlog < 0) 321 { 322 this.backlog = BACKLOG_DEFAULT; 323 } 324 else 325 { 326 this.backlog = backlog; 327 } 328 } 329 330 331 public void run() 332 { 333 while(running) 334 { 335 try 336 { 337 Socket socket = serverSocket.accept(); 338 if(trace) 339 { 340 log.trace("Accepted: " + socket); 341 } 342 ServerThread thread = null; 343 boolean newThread = false; 344 345 while(thread == null) 346 { 347 synchronized(threadpool) 348 { 349 if(threadpool.size() > 0) 350 { 351 thread = (ServerThread) threadpool.removeFirst(); 352 } 353 } 354 if(thread == null) 355 { 356 synchronized(clientpool) 357 { 358 if(clientpool.size() < maxPoolSize) 359 { 360 thread = new ServerThread(socket, this, clientpool, threadpool, timeout, serverSocketClass); 361 newThread = true; 362 } 363 if(thread == null) 364 { 365 clientpool.evict(); 366 if(trace) 367 { 368 log.trace("Waiting for a thread..."); 369 } 370 clientpool.wait(); 371 if(trace) 372 { 373 log.trace("Notified of available thread"); 374 } 375 } 376 } 377 } 378 } 379 synchronized(clientpool) 380 { 381 clientpool.insert(thread, thread); 382 } 383 384 if(newThread) 385 { 386 if(trace) 387 { 388 log.trace("Created a new thread, t=" + thread); 389 } 390 thread.start(); 391 } 392 else 393 { 394 if(trace) 395 { 396 log.trace("Reusing thread t=" + thread); 397 } 398 thread.wakeup(socket, timeout); 399 } 400 } 401 catch(Throwable ex) 402 { 403 if(running) 404 { 405 log.error("Failed to accept socket connection", ex); 406 } 407 } 408 } 409 } 410 411 418 public boolean isTransportBiDirectional() 419 { 420 return true; 421 } 422 423 430 protected String getDefaultDataType() 431 { 432 return SerializableMarshaller.DATATYPE; 433 } 434 435 } 436 | Popular Tags |