1 52 53 package com.go.trove.net; 54 55 import java.io.*; 56 import java.net.*; 57 import java.util.*; 58 import java.lang.ref.*; 59 import com.go.trove.util.IdentityMap; 60 61 75 public class DistributedSocketFactory implements SocketFactory { 76 private final long mTimeout; 77 78 private int mFactoryIndex; 79 80 private List mFactories; 82 83 private Map mResurrectors; 85 86 private Map mSocketSources; 88 89 private CheckedSocket.ExceptionListener mListener; 90 91 95 public DistributedSocketFactory(long timeout) { 96 mTimeout = timeout; 97 mFactories = Collections.synchronizedList(new ArrayList()); 98 mResurrectors = Collections.synchronizedMap(new HashMap()); 99 mSocketSources = Collections.synchronizedMap(new IdentityMap()); 100 101 mListener = new CheckedSocket.ExceptionListener() { 102 public void exceptionOccurred(CheckedSocket s, Exception e, int count) { 103 if (count == 1) { 104 deadFactory((SocketFactory)mSocketSources.get(s)); 105 } 106 } 107 }; 108 } 109 110 public void addSocketFactory(SocketFactory factory) { 111 mFactories.add(factory); 112 } 113 114 public void removeSocketFactory(SocketFactory factory) { 115 mFactories.remove(factory); 116 Thread t = (Thread )mResurrectors.remove(factory); 117 if (t != null) { 118 t.interrupt(); 119 } 120 } 121 122 public InetAddressAndPort getInetAddressAndPort() { 123 try { 124 return getFactory(selectFactory(null)).getInetAddressAndPort(); 125 } 126 catch (ConnectException e) { 127 return InetAddressAndPort.UNKNOWN; 128 } 129 } 130 131 public InetAddressAndPort getInetAddressAndPort(Object session) { 132 try { 133 return getFactory(selectFactory(session)) 134 .getInetAddressAndPort(session); 135 } 136 catch (ConnectException e) { 137 return InetAddressAndPort.UNKNOWN; 138 } 139 } 140 141 public long getDefaultTimeout() { 142 return mTimeout; 143 } 144 145 public CheckedSocket createSocket() 146 throws ConnectException, SocketException 147 { 148 return createSocket(null, mTimeout); 149 } 150 151 public CheckedSocket createSocket(Object session) 152 throws ConnectException, SocketException 153 { 154 return createSocket(session, mTimeout); 155 } 156 157 public CheckedSocket createSocket(long timeout) 158 throws ConnectException, SocketException 159 { 160 return createSocket(null, timeout); 161 } 162 163 public CheckedSocket createSocket(Object session, long timeout) 164 throws ConnectException, SocketException 165 { 166 long startTime = timeout > 0 ? System.currentTimeMillis() : 0; 167 int index = selectFactory(session); 168 int count = mFactories.size(); 169 170 for (int i=0; i<count; i++) { 171 SocketFactory factory = null; 172 try { 173 factory = getFactory(index++); 174 CheckedSocket socket = factory.createSocket(session, timeout); 175 socket.addExceptionListener(mListener); 176 mSocketSources.put(socket, factory); 177 return socket; 178 } 179 catch (SocketException e) { 180 deadFactory(factory); 181 182 if (timeout == 0) { 183 throw e; 184 } 185 186 if (timeout > 0) { 187 timeout -= (System.currentTimeMillis() - startTime); 188 if (timeout < 0) { 189 throw e; 190 } 191 } 192 } 193 } 194 195 throw new ConnectException("Unable to create socket"); 196 } 197 198 public CheckedSocket getSocket() throws ConnectException, SocketException { 199 return getSocket(null, mTimeout); 200 } 201 202 public CheckedSocket getSocket(Object session) 203 throws ConnectException, SocketException 204 { 205 return getSocket(session, mTimeout); 206 } 207 208 public CheckedSocket getSocket(long timeout) 209 throws ConnectException, SocketException 210 { 211 return getSocket(null, timeout); 212 } 213 214 public CheckedSocket getSocket(Object session, long timeout) 215 throws ConnectException, SocketException 216 { 217 long startTime = timeout > 0 ? System.currentTimeMillis() : 0; 218 int index = selectFactory(session); 219 int count = mFactories.size(); 220 221 for (int i=0; i<count; i++) { 222 SocketFactory factory = null; 223 try { 224 factory = getFactory(index++); 225 CheckedSocket socket = factory.getSocket(session, timeout); 226 socket.addExceptionListener(mListener); 227 mSocketSources.put(socket, factory); 228 return socket; 229 } 230 catch (SocketException e) { 231 deadFactory(factory); 232 233 if (timeout == 0) { 234 throw e; 235 } 236 237 if (timeout > 0) { 238 timeout -= (System.currentTimeMillis() - startTime); 239 if (timeout < 0) { 240 throw e; 241 } 242 } 243 } 244 } 245 246 throw new ConnectException("Unable to get socket"); 247 } 248 249 public void recycleSocket(CheckedSocket socket) 250 throws SocketException, IllegalArgumentException 251 { 252 if (socket == null) { 253 return; 254 } 255 256 SocketFactory source = (SocketFactory)mSocketSources.remove(socket); 257 258 if (source == null) { 259 throw new IllegalArgumentException 260 ("Socket did not originate from this pool"); 261 } 262 263 socket.removeExceptionListener(mListener); 264 source.recycleSocket(socket); 265 } 266 267 public void clear() { 268 synchronized (mFactories) { 269 for (int i = mFactories.size(); --i >= 0; ) { 270 ((SocketFactory)mFactories.get(i)).clear(); 271 } 272 } 273 } 274 275 public int getAvailableCount() { 276 int count = 0; 277 synchronized (mFactories) { 278 for (int i = mFactories.size(); --i >= 0; ) { 279 count += ((SocketFactory)mFactories.get(i)) 280 .getAvailableCount(); 281 } 282 } 283 return count; 284 } 285 286 290 private SocketFactory getFactory(int index) throws ConnectException { 291 synchronized (mFactories) { 292 int size = mFactories.size(); 293 if (size <= 0) { 294 throw new ConnectException("No SocketFactories available"); 295 } 296 return (SocketFactory)mFactories.get(index % size); 297 } 298 } 299 300 304 private int selectFactory(Object session) throws ConnectException { 305 if (session != null) { 306 return session.hashCode() & 0x7fffffff; 307 } 308 else { 309 synchronized (mFactories) { 310 return mFactoryIndex++ & 0x7fffffff; 311 } 312 } 313 } 314 315 private void deadFactory(SocketFactory factory) { 316 if (factory == null) { 317 return; 318 } 319 320 synchronized (mFactories) { 321 if (mFactories.contains(factory) && mFactories.size() > 1) { 323 mFactories.remove(factory); 324 325 Resurrector r = new Resurrector(this, factory); 326 Thread t = new Thread (null, r, "Resurrector " + 327 factory.getInetAddressAndPort()); 328 t.setDaemon(true); 329 t.start(); 330 mResurrectors.put(factory, t); 331 } 332 } 333 } 334 335 private static class Resurrector implements Runnable { 336 private final Reference mOwner; 339 private final SocketFactory mFactory; 340 341 public Resurrector(DistributedSocketFactory owner, 342 SocketFactory factory) { 343 mOwner = new WeakReference(owner); 344 mFactory = factory; 345 } 346 347 public void run() { 348 DistributedSocketFactory owner = null; 349 try { 350 while (!Thread.interrupted()) { 351 owner = (DistributedSocketFactory)mOwner.get(); 352 if (owner == null) { 353 break; 354 } 355 356 try { 357 mFactory.recycleSocket(mFactory.createSocket()); 358 owner.mFactories.add(mFactory); 359 break; 360 } 361 catch (IOException e) { 362 } 363 364 owner = null; 365 366 try { 368 Thread.sleep(5000); 369 } 370 catch (InterruptedException e) { 371 break; 372 } 373 } 374 } 375 finally { 376 owner = (DistributedSocketFactory)mOwner.get(); 377 if (owner != null) { 378 owner.mResurrectors.remove(mFactory); 379 } 380 } 381 } 382 } 383 } 384 | Popular Tags |