1 9 package org.jboss.remoting.transport.socket; 10 11 import java.io.IOException ; 12 import java.lang.reflect.Constructor ; 13 import java.net.InetAddress ; 14 import java.net.Socket ; 15 import java.rmi.ConnectException ; 16 import java.util.HashMap ; 17 import java.util.Iterator ; 18 import java.util.LinkedList ; 19 import java.util.Map ; 20 import org.jboss.remoting.CannotConnectException; 21 import org.jboss.remoting.ConnectionFailedException; 22 import org.jboss.remoting.InvokerLocator; 23 import org.jboss.remoting.RemoteClientInvoker; 24 import org.jboss.remoting.marshal.Marshaller; 25 import org.jboss.remoting.marshal.UnMarshaller; 26 import org.jboss.remoting.marshal.serializable.SerializableMarshaller; 27 28 36 public class SocketClientInvoker extends RemoteClientInvoker 37 { 38 private InetAddress addr; 39 private int port; 40 41 public static final String TCP_NODELAY_FLAG = "enableTcpNoDelay"; 42 public static final String MAX_POOL_SIZE_FLAG = "clientMaxPoolSize"; 43 public static final String SO_TIMEOUT_FLAG = "socketTimeout"; 44 public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass"; 45 46 public static final int SO_TIMEOUT_DEFAULT = 60000; public static final boolean TCP_NODELAY_DEFAULT = false; 48 49 public static long getSocketTime = 0; 51 public static long readTime = 0; 52 public static long writeTime = 0; 53 public static long serializeTime = 0; 54 public static long deserializeTime = 0; 55 56 59 protected boolean enableTcpNoDelay = TCP_NODELAY_DEFAULT; 60 61 protected int timeout = SO_TIMEOUT_DEFAULT; 62 63 protected String clientSocketClassName = ClientSocketWrapper.class.getName(); 64 private Constructor clientSocketConstructor = null; 65 66 69 public static final int MAX_RETRIES = 10; 70 public static long usedPooled = 0; 71 72 protected int numberOfRetries = MAX_RETRIES; 73 74 78 protected LinkedList pool = null; 79 80 83 protected ServerAddress address; 84 85 protected static HashMap connectionPools = new HashMap (); 86 protected int maxPoolSize = 10; 87 88 89 public SocketClientInvoker(InvokerLocator locator) 90 throws IOException 91 { 92 super(locator); 93 try 94 { 95 setup(); 96 } 97 catch(Exception ex) 98 { 99 throw new RuntimeException (ex.getMessage()); 100 } 101 } 102 103 protected void setup() 104 throws Exception 105 { 106 this.addr = InetAddress.getByName(locator.getHost()); 107 this.port = locator.getPort(); 108 109 configureParameters(); 110 111 address = new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, timeout); 112 } 113 114 private void configureParameters() 115 { 116 Map params = locator.getParameters(); 117 if(params != null) 118 { 119 Object val = params.get(TCP_NODELAY_FLAG); 121 if(val != null) 122 { 123 try 124 { 125 boolean bVal = Boolean.valueOf((String ) val).booleanValue(); 126 enableTcpNoDelay = bVal; 127 log.debug("Setting SocketClientInvoker::enableTcpNoDelay to: " + enableTcpNoDelay); 128 } 129 catch(Exception e) 130 { 131 log.warn("Could not convert " + TCP_NODELAY_FLAG + " value of " + val + " to a boolean value."); 132 } 133 } 134 val = params.get(MAX_POOL_SIZE_FLAG); 136 if(val != null) 137 { 138 try 139 { 140 int nVal = Integer.valueOf((String ) val).intValue(); 141 maxPoolSize = nVal; 142 log.debug("Setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize); 143 } 144 catch(Exception e) 145 { 146 log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value."); 147 } 148 } 149 val = params.get(SO_TIMEOUT_FLAG); 151 if(val != null) 152 { 153 try 154 { 155 int nVal = Integer.valueOf((String ) val).intValue(); 156 timeout = nVal; 157 log.debug("Setting SocketClientInvoker::timeout to: " + timeout); 158 } 159 catch(Exception e) 160 { 161 log.warn("Could not convert " + SO_TIMEOUT_FLAG + " value of " + val + " to a int value."); 162 } 163 } 164 val = params.get(CLIENT_SOCKET_CLASS_FLAG); 166 if(val != null) 167 { 168 String value = (String ) val; 169 if(value.length() > 0) 170 { 171 clientSocketClassName = value; 172 log.debug("Setting ClientSocket class name to: " + clientSocketClassName); 173 } 174 } 175 } 176 } 177 178 protected void finalize() throws Throwable 179 { 180 disconnect(); 181 super.finalize(); 182 } 183 184 protected synchronized void handleConnect() 185 throws ConnectionFailedException 186 { 187 initPool(); 188 } 189 190 protected synchronized void handleDisconnect() 191 { 192 clearPools(); 193 } 194 195 202 protected String getDefaultDataType() 203 { 204 return SerializableMarshaller.DATATYPE; 205 } 206 207 216 protected Object transport(String sessionId, Object invocation, Map metadata, 217 Marshaller marshaller, UnMarshaller unmarshaller) 218 throws IOException , ConnectionFailedException, ClassNotFoundException 219 { 220 221 Object response = null; 222 long start = System.currentTimeMillis(); 223 SocketWrapper socketWrapper = null; 224 try 225 { 226 socketWrapper = getConnection(); 227 } 228 catch(Exception e) 229 { 230 throw new CannotConnectException("Can not get connection to server. Problem establishing socket connection.", e); 231 } 232 long end = System.currentTimeMillis() - start; 233 getSocketTime += end; 234 try 235 { 236 marshaller.write(invocation, socketWrapper.getOutputStream()); 237 238 end = System.currentTimeMillis() - start; 239 writeTime += end; 240 start = System.currentTimeMillis(); 241 242 response = unmarshaller.read(socketWrapper.getInputStream(), null); 243 244 end = System.currentTimeMillis() - start; 245 readTime += end; 246 } 247 catch(Exception ex) 248 { 249 try 250 { 251 socketWrapper.close(); 252 } 253 catch(Exception ignored) 254 { 255 } 256 log.error("Got marshalling exception, exiting", ex); 257 if(ex instanceof ClassNotFoundException ) 258 { 259 log.error("Error loading classes from remote call result.", ex); 261 throw (ClassNotFoundException ) ex; 262 } 263 264 throw new ConnectException ("Failed to communicate. Problem during marshalling/unmarshalling", ex); 265 } 266 267 synchronized(pool) 269 { 270 if(pool.size() < maxPoolSize) 271 { 272 pool.add(socketWrapper); 273 } 274 else 275 { 276 try 277 { 278 socketWrapper.close(); 279 } 280 catch(Exception ignored) 281 { 282 } 283 } 284 } 285 286 if(log.isTraceEnabled()) 288 { 289 log.trace("Response: " + response); 290 } 291 292 return response; 293 294 } 295 296 299 public static void clearPool(ServerAddress sa) 300 { 301 try 302 { 303 LinkedList thepool = (LinkedList ) connectionPools.get(sa); 304 if(thepool == null) 305 { 306 return; 307 } 308 synchronized(thepool) 309 { 310 int size = thepool.size(); 311 for(int i = 0; i < size; i++) 312 { 313 SocketWrapper socketWrapper = (SocketWrapper) thepool.removeFirst(); 314 try 315 { 316 socketWrapper.close(); 317 socketWrapper = null; 318 } 319 catch(Exception ignored) 320 { 321 } 322 } 323 } 324 } 325 catch(Exception ex) 326 { 327 } 329 } 330 331 334 public static void clearPools() 335 { 336 synchronized(connectionPools) 337 { 338 Iterator it = connectionPools.keySet().iterator(); 339 while(it.hasNext()) 340 { 341 ServerAddress sa = (ServerAddress) it.next(); 342 clearPool(sa); 343 } 344 } 345 } 346 347 protected void initPool() 348 { 349 synchronized(connectionPools) 350 { 351 pool = (LinkedList ) connectionPools.get(address); 352 if(pool == null) 353 { 354 pool = new LinkedList (); 355 connectionPools.put(address, pool); 356 } 357 } 358 } 359 360 365 public void setNumberOfRetries(int numberOfRetries) 366 { 367 if(numberOfRetries < 1) 368 { 369 this.numberOfRetries = MAX_RETRIES; 370 } 371 else 372 { 373 this.numberOfRetries = numberOfRetries; 374 } 375 } 376 377 public int getNumberOfRetries() 378 { 379 return numberOfRetries; 380 } 381 382 protected SocketWrapper getConnection() throws Exception 383 { 384 Exception failed = null; 385 Socket socket = null; 386 387 388 for(int i = 0; i < numberOfRetries; i++) 401 { 402 synchronized(pool) 403 { 404 if(pool.size() > 0) 405 { 406 SocketWrapper pooled = getPooledConnection(); 407 if(pooled != null) 408 { 409 usedPooled++; 410 return pooled; 411 } 412 } 413 } 414 415 try 416 { 417 socket = createSocket(address.address, address.port); 418 break; 419 } 420 catch(Exception ex) 421 { 422 if(i + 1 < MAX_RETRIES) 423 { 424 Thread.sleep(1); 425 continue; 426 } 427 throw ex; 428 } 429 } 430 socket.setTcpNoDelay(address.enableTcpNoDelay); 431 return createClientSocket(socket, address.timeout); 432 } 433 434 private SocketWrapper createClientSocket(Socket socket, int timeout) throws Exception 435 { 436 if(clientSocketConstructor == null) 437 { 438 ClassLoader classLoader = getClassLoader(); 439 if(classLoader == null) 440 { 441 classLoader = Thread.currentThread().getContextClassLoader(); 442 443 if(classLoader == null) 444 { 445 classLoader = getClass().getClassLoader(); 446 } 447 } 448 Class cl = classLoader.loadClass(clientSocketClassName); 449 450 clientSocketConstructor = cl.getConstructor(new Class []{Socket .class}); 451 } 452 SocketWrapper clientSocketWrapper = (SocketWrapper) clientSocketConstructor.newInstance(new Object []{socket}); 453 clientSocketWrapper.setTimeout(timeout); 454 455 return clientSocketWrapper; 456 } 457 458 protected Socket createSocket(String address, int port) throws IOException 459 { 460 return new Socket (address, port); 461 } 462 463 protected SocketWrapper getPooledConnection() 464 { 465 SocketWrapper socketWrapper = null; 466 while(pool.size() > 0) 467 { 468 socketWrapper = (SocketWrapper) pool.removeFirst(); 469 try 470 { 471 if(socketWrapper != null) 472 { 473 socketWrapper.checkConnection(); 474 return socketWrapper; 475 } 476 } 477 catch(Exception ex) 478 { 479 try 480 { 481 socketWrapper.close(); 482 } 483 catch(Exception ignored) 484 { 485 } 486 } 487 } 488 return null; 489 } 490 491 492 495 public String getServerHostName() throws Exception 496 { 497 return address.address; 498 } 499 500 501 } 502 | Popular Tags |