KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > go > trove > net > DistributedSocketFactory


1 /* ====================================================================
2  * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
3  * ====================================================================
4  * The Tea Software License, Version 1.1
5  *
6  * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Walt Disney Internet Group (http://opensource.go.com/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact opensource@dig.com.
31  *
32  * 5. Products derived from this software may not be called "Tea",
33  * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34  * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35  * written permission of the Walt Disney Internet Group.
36  *
37  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40  * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
46  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48  * ====================================================================
49  *
50  * For more information about Tea, please see http://opensource.go.com/.
51  */

52
53 package com.go.trove.net;
54
55 import java.io.*;
56 import java.net.*;
57 import java.util.*;
58 import java.lang.ref.*;
59 import com.go.trove.util.IdentityMap;
60
61 /******************************************************************************
62  * A SocketFactory implementation for distributing load among several
63  * SocketFactories. If an exception occurs on a socket, its pool is put into
64  * the "dead" list. A special thread will run in the background, trying to
65  * resurrect the dead SocketSocket. As soon as its able to create sockets
66  * again, its added back into the "live" list.
67  * <p>
68  * Consider wrapping with a {@link LazySocketFactory} for automatic checking
69  * against socket factories that may be dead.
70  *
71  * @author Brian S O'Neill
72  * @version
73  * <!--$$Revision:--> 7 <!-- $-->, <!--$$JustDate:--> 01/01/22 <!-- $-->
74  */

75 public class DistributedSocketFactory implements SocketFactory {
76     private final long mTimeout;
77
78     private int mFactoryIndex;
79
80     // Contains only the live SocketFactories.
81
private List mFactories;
82
83     // Maps SocketPools to resurrector threads.
84
private Map mResurrectors;
85
86     // Maps CheckedSockets to the SocketPools that they came from.
87
private Map mSocketSources;
88
89     private CheckedSocket.ExceptionListener mListener;
90
91     /**
92      * @param timeout Maximum time to wait (in milliseconds) for new
93      * connections to be established before throwing an exception
94      */

95     public DistributedSocketFactory(long timeout) {
96         mTimeout = timeout;
97         mFactories = Collections.synchronizedList(new ArrayList());
98         mResurrectors = Collections.synchronizedMap(new HashMap());
99         mSocketSources = Collections.synchronizedMap(new IdentityMap());
100
101         mListener = new CheckedSocket.ExceptionListener() {
102             public void exceptionOccurred(CheckedSocket s, Exception JavaDoc e, int count) {
103                 if (count == 1) {
104                     deadFactory((SocketFactory)mSocketSources.get(s));
105                 }
106             }
107         };
108     }
109
110     public void addSocketFactory(SocketFactory factory) {
111         mFactories.add(factory);
112     }
113
114     public void removeSocketFactory(SocketFactory factory) {
115         mFactories.remove(factory);
116         Thread JavaDoc t = (Thread JavaDoc)mResurrectors.remove(factory);
117         if (t != null) {
118             t.interrupt();
119         }
120     }
121
122     public InetAddressAndPort getInetAddressAndPort() {
123         try {
124             return getFactory(selectFactory(null)).getInetAddressAndPort();
125         }
126         catch (ConnectException e) {
127             return InetAddressAndPort.UNKNOWN;
128         }
129     }
130
131     public InetAddressAndPort getInetAddressAndPort(Object JavaDoc session) {
132         try {
133             return getFactory(selectFactory(session))
134                 .getInetAddressAndPort(session);
135         }
136         catch (ConnectException e) {
137             return InetAddressAndPort.UNKNOWN;
138         }
139     }
140
141     public long getDefaultTimeout() {
142         return mTimeout;
143     }
144
145     public CheckedSocket createSocket()
146         throws ConnectException, SocketException
147     {
148         return createSocket(null, mTimeout);
149     }
150
151     public CheckedSocket createSocket(Object JavaDoc session)
152         throws ConnectException, SocketException
153     {
154         return createSocket(session, mTimeout);
155     }
156
157     public CheckedSocket createSocket(long timeout)
158         throws ConnectException, SocketException
159     {
160         return createSocket(null, timeout);
161     }
162
163     public CheckedSocket createSocket(Object JavaDoc session, long timeout)
164         throws ConnectException, SocketException
165     {
166         long startTime = timeout > 0 ? System.currentTimeMillis() : 0;
167         int index = selectFactory(session);
168         int count = mFactories.size();
169
170         for (int i=0; i<count; i++) {
171             SocketFactory factory = null;
172             try {
173                 factory = getFactory(index++);
174                 CheckedSocket socket = factory.createSocket(session, timeout);
175                 socket.addExceptionListener(mListener);
176                 mSocketSources.put(socket, factory);
177                 return socket;
178             }
179             catch (SocketException e) {
180                 deadFactory(factory);
181                 
182                 if (timeout == 0) {
183                     throw e;
184                 }
185                 
186                 if (timeout > 0) {
187                     timeout -= (System.currentTimeMillis() - startTime);
188                     if (timeout < 0) {
189                         throw e;
190                     }
191                 }
192             }
193         }
194
195         throw new ConnectException("Unable to create socket");
196     }
197
198     public CheckedSocket getSocket() throws ConnectException, SocketException {
199         return getSocket(null, mTimeout);
200     }
201
202     public CheckedSocket getSocket(Object JavaDoc session)
203         throws ConnectException, SocketException
204     {
205         return getSocket(session, mTimeout);
206     }
207
208     public CheckedSocket getSocket(long timeout)
209         throws ConnectException, SocketException
210     {
211         return getSocket(null, timeout);
212     }
213
214     public CheckedSocket getSocket(Object JavaDoc session, long timeout)
215         throws ConnectException, SocketException
216     {
217         long startTime = timeout > 0 ? System.currentTimeMillis() : 0;
218         int index = selectFactory(session);
219         int count = mFactories.size();
220
221         for (int i=0; i<count; i++) {
222             SocketFactory factory = null;
223             try {
224                 factory = getFactory(index++);
225                 CheckedSocket socket = factory.getSocket(session, timeout);
226                 socket.addExceptionListener(mListener);
227                 mSocketSources.put(socket, factory);
228                 return socket;
229             }
230             catch (SocketException e) {
231                 deadFactory(factory);
232                 
233                 if (timeout == 0) {
234                     throw e;
235                 }
236                 
237                 if (timeout > 0) {
238                     timeout -= (System.currentTimeMillis() - startTime);
239                     if (timeout < 0) {
240                         throw e;
241                     }
242                 }
243             }
244         }
245
246         throw new ConnectException("Unable to get socket");
247     }
248
249     public void recycleSocket(CheckedSocket socket)
250         throws SocketException, IllegalArgumentException JavaDoc
251     {
252         if (socket == null) {
253             return;
254         }
255
256         SocketFactory source = (SocketFactory)mSocketSources.remove(socket);
257
258         if (source == null) {
259             throw new IllegalArgumentException JavaDoc
260                 ("Socket did not originate from this pool");
261         }
262
263         socket.removeExceptionListener(mListener);
264         source.recycleSocket(socket);
265     }
266
267     public void clear() {
268         synchronized (mFactories) {
269             for (int i = mFactories.size(); --i >= 0; ) {
270                 ((SocketFactory)mFactories.get(i)).clear();
271             }
272         }
273     }
274
275     public int getAvailableCount() {
276         int count = 0;
277         synchronized (mFactories) {
278             for (int i = mFactories.size(); --i >= 0; ) {
279                 count += ((SocketFactory)mFactories.get(i))
280                     .getAvailableCount();
281             }
282         }
283         return count;
284     }
285
286     /**
287      * The provided index must be positive, but it can be out of the factory
288      * list bounds.
289      */

290     private SocketFactory getFactory(int index) throws ConnectException {
291         synchronized (mFactories) {
292             int size = mFactories.size();
293             if (size <= 0) {
294                 throw new ConnectException("No SocketFactories available");
295             }
296             return (SocketFactory)mFactories.get(index % size);
297         }
298     }
299
300     /**
301      * Returns an index which is positive, but may be out of the factory list
302      * bounds.
303      */

304     private int selectFactory(Object JavaDoc session) throws ConnectException {
305         if (session != null) {
306             return session.hashCode() & 0x7fffffff;
307         }
308         else {
309             synchronized (mFactories) {
310                 return mFactoryIndex++ & 0x7fffffff;
311             }
312         }
313     }
314
315     private void deadFactory(SocketFactory factory) {
316         if (factory == null) {
317             return;
318         }
319
320         synchronized (mFactories) {
321             // Only remove factory if its not the last one left.
322
if (mFactories.contains(factory) && mFactories.size() > 1) {
323                 mFactories.remove(factory);
324                 
325                 Resurrector r = new Resurrector(this, factory);
326                 Thread JavaDoc t = new Thread JavaDoc(null, r, "Resurrector " +
327                                       factory.getInetAddressAndPort());
328                 t.setDaemon(true);
329                 t.start();
330                 mResurrectors.put(factory, t);
331             }
332         }
333     }
334
335     private static class Resurrector implements Runnable JavaDoc {
336         // Weakly references owner so that this thread won't prevent it from
337
// being garbage collected.
338
private final Reference mOwner;
339         private final SocketFactory mFactory;
340
341         public Resurrector(DistributedSocketFactory owner,
342                            SocketFactory factory) {
343             mOwner = new WeakReference(owner);
344             mFactory = factory;
345         }
346
347         public void run() {
348             DistributedSocketFactory owner = null;
349             try {
350                 while (!Thread.interrupted()) {
351                     owner = (DistributedSocketFactory)mOwner.get();
352                     if (owner == null) {
353                         break;
354                     }
355
356                     try {
357                         mFactory.recycleSocket(mFactory.createSocket());
358                         owner.mFactories.add(mFactory);
359                         break;
360                     }
361                     catch (IOException e) {
362                     }
363                     
364                     owner = null;
365
366                     // Wait at 5 seconds before trying again.
367
try {
368                         Thread.sleep(5000);
369                     }
370                     catch (InterruptedException JavaDoc e) {
371                         break;
372                     }
373                 }
374             }
375             finally {
376                 owner = (DistributedSocketFactory)mOwner.get();
377                 if (owner != null) {
378                     owner.mResurrectors.remove(mFactory);
379                 }
380             }
381         }
382     }
383 }
384
Popular Tags