KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > remoting > transport > socket > SocketServerInvoker


1 /***************************************
2  * *
3  * JBoss: The OpenSource J2EE WebOS *
4  * *
5  * Distributable under LGPL license. *
6  * See terms of license at gnu.org. *
7  * *
8  ***************************************/

9 package org.jboss.remoting.transport.socket;
10
11 import java.io.IOException JavaDoc;
12 import java.net.InetAddress JavaDoc;
13 import java.net.ServerSocket JavaDoc;
14 import java.net.Socket JavaDoc;
15 import java.util.Iterator JavaDoc;
16 import java.util.LinkedList JavaDoc;
17 import java.util.Map JavaDoc;
18 import java.util.Properties JavaDoc;
19 import java.util.Set JavaDoc;
20 import org.jboss.remoting.InvokerLocator;
21 import org.jboss.remoting.ServerInvoker;
22 import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
23 import org.jboss.util.propertyeditor.PropertyEditors;
24
25 /**
26  * SocketServerInvoker is the server-side of a SOCKET based transport
27  *
28  * @author <a HREF="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
29  * @author <a HREF="mailto:tom.elrod@jboss.com">Tom Elrod</a>
30  * @version $Revision: 1.6 $
31  * @jmx:mbean
32  */

33 public class SocketServerInvoker extends ServerInvoker implements Runnable JavaDoc, SocketServerInvokerMBean
34 {
35    private InetAddress JavaDoc addr;
36    private int port;
37    static int clientCount = 0;
38
39    private Properties JavaDoc props = new Properties JavaDoc();
40
41    private static int BACKLOG_DEFAULT = 200;
42    private static int MAX_POOL_SIZE_DEFAULT = 300;
43
44    /**
45     * Property value to set if want a custom socket wrapper for providing custom streams.
46     */

47    public static final String JavaDoc SERVER_SOCKET_CLASS_FLAG = "serverSocketClass";
48    private String JavaDoc serverSocketClass = ServerSocketWrapper.class.getName();
49
50    protected ServerSocket JavaDoc serverSocket = null;
51    protected boolean running = false;
52    protected int backlog = BACKLOG_DEFAULT;
53    protected Thread JavaDoc[] acceptThreads;
54    protected int numAcceptThreads = 1;
55    protected int maxPoolSize = MAX_POOL_SIZE_DEFAULT;
56    protected LRUPool clientpool;
57    protected LinkedList JavaDoc threadpool;
58    protected int timeout = 60000; // 60 seconds.
59

60    /**
61     * The logging trace level flag
62     */

63    protected boolean trace = false;
64
65
66    public SocketServerInvoker(InvokerLocator locator)
67    {
68       super(locator);
69    }
70
71    public SocketServerInvoker(InvokerLocator locator, Map JavaDoc configuration)
72    {
73       super(locator, configuration);
74    }
75
76    public InetAddress JavaDoc getAddress()
77    {
78       return addr;
79    }
80
81    public int getPort()
82    {
83       return port;
84    }
85
86    public Properties JavaDoc getProperties()
87    {
88       return props;
89    }
90
91    protected void setup()
92          throws Exception JavaDoc
93    {
94       props.putAll(getConfiguration());
95       PropertyEditors.mapJavaBeanProperties(this, props, false);
96
97       super.setup();
98
99       String JavaDoc ssclass = props.getProperty(SERVER_SOCKET_CLASS_FLAG);
100       if(ssclass != null)
101       {
102          serverSocketClass = ssclass;
103       }
104
105    }
106
107    protected void finalize() throws Throwable JavaDoc
108    {
109       stop();
110       super.finalize();
111    }
112
113    /**
114     * Starts the invoker.
115     *
116     * @jmx.managed-operation description = "Start sets up the ServerInvoker we are wrapping."
117     * impact = "ACTION"
118     */

119    public synchronized void start() throws IOException JavaDoc
120    {
121
122       trace = log.isTraceEnabled();
123
124       if(!running)
125       {
126          running = true;
127
128          InetAddress JavaDoc bindAddress = InetAddress.getByName(getServerBindAddress());
129
130          if(maxPoolSize <= 0)
131          {
132             //need to reset to default
133
maxPoolSize = MAX_POOL_SIZE_DEFAULT;
134          }
135          clientpool = new LRUPool(2, maxPoolSize);
136          clientpool.create();
137          threadpool = new LinkedList JavaDoc();
138          try
139          {
140             serverSocket = createServerSocket(getServerBindPort(), backlog, bindAddress);
141          }
142          catch(IOException JavaDoc e)
143          {
144             log.error("Error starting ServerSocket. Bind port: " + getServerBindPort() + ", bind address: " + bindAddress);
145             throw e;
146          }
147
148          acceptThreads = new Thread JavaDoc[numAcceptThreads];
149          for(int i = 0; i < numAcceptThreads; i++)
150          {
151             String JavaDoc name = "SocketServerInvoker#" + i + "-" + getServerBindPort();
152             acceptThreads[i] = new Thread JavaDoc(this, name);
153             acceptThreads[i].start();
154          }
155       }
156       super.start();
157    }
158
159    protected ServerSocket JavaDoc createServerSocket(int serverBindPort, int backlog, InetAddress JavaDoc bindAddress) throws IOException JavaDoc
160    {
161       ServerSocket JavaDoc svrSocket = new ServerSocket JavaDoc(serverBindPort, backlog, bindAddress);
162       log.debug("Created server socket: " + svrSocket);
163       return svrSocket;
164    }
165
166    public void destroy()
167    {
168       clientpool.destroy();
169    }
170
171    /**
172     * Stops the invoker.
173     *
174     * @jmx.managed-operation description = "Stops the invoker."
175     * impact = "ACTION"
176     */

177    public synchronized void stop()
178    {
179
180       if(running)
181       {
182          running = false;
183
184          maxPoolSize = 0; // so ServerThreads don't reinsert themselves
185
for(int i = 0; i < acceptThreads.length; i++)
186          {
187             try
188             {
189                acceptThreads[i].interrupt();
190             }
191             catch(Exception JavaDoc ignored)
192             {
193             }
194          }
195          Set JavaDoc svrThreads = clientpool.getContents();
196          Iterator JavaDoc itr = svrThreads.iterator();
197          while(itr.hasNext())
198          {
199             Object JavaDoc o = itr.next();
200             ServerThread st = (ServerThread) o;
201             st.shutdown();
202          }
203          clientpool.flush();
204          clientpool.stop();
205          for(int i = 0; i < threadpool.size(); i++)
206          {
207             ServerThread thread = (ServerThread) threadpool.removeFirst();
208             thread.shutdown();
209          }
210
211          try
212          {
213             serverSocket.close();
214          }
215          catch(Exception JavaDoc e)
216          {
217          }
218       }
219       super.stop();
220    }
221
222    /**
223     * Getter for property timeout
224     *
225     * @return Value of property timeout
226     * @jmx:managed-attribute
227     */

228    public int getSocketTimeout()
229    {
230       return timeout;
231    }
232
233    /**
234     * Setter for property timeout
235     *
236     * @param time New value of property timeout
237     * @jmx:managed-attribute
238     */

239    public void setSocketTimeout(int time)
240    {
241       this.timeout = time;
242    }
243
244    /**
245     * @return Value of property serverBindPort.
246     * @jmx:managed-attribute
247     */

248    public int getCurrentThreadPoolSize()
249    {
250       return threadpool.size();
251    }
252
253    /**
254     * @return Value of property serverBindPort.
255     * @jmx:managed-attribute
256     */

257    public int getCurrentClientPoolSize()
258    {
259       return clientpool.size();
260    }
261
262    /**
263     * Getter for property numAcceptThreads
264     *
265     * @return The number of threads that exist for accepting client connections
266     * @jmx:managed-attribute
267     */

268    public int getNumAcceptThreads()
269    {
270       return numAcceptThreads;
271    }
272
273    /**
274     * Setter for property numAcceptThreads
275     *
276     * @param size The number of threads that exist for accepting client connections
277     * @jmx:managed-attribute
278     */

279    public void setNumAcceptThreads(int size)
280    {
281       this.numAcceptThreads = size;
282    }
283
284    /**
285     * Setter for max pool size.
286     * The number of server threads for processing client. The default is 300.
287     *
288     * @return
289     * @jmx:managed-attribute
290     */

291    public int getMaxPoolSize()
292    {
293       return maxPoolSize;
294    }
295
296    /**
297     * The number of server threads for processing client. The default is 300.
298     *
299     * @param maxPoolSize
300     * @jmx:managed-attribute
301     */

302    public void setMaxPoolSize(int maxPoolSize)
303    {
304       this.maxPoolSize = maxPoolSize;
305    }
306
307    /**
308     * @jmx:managed-attribute
309     */

310    public int getBacklog()
311    {
312       return backlog;
313    }
314
315    /**
316     * @jmx:managed-attribute
317     */

318    public void setBacklog(int backlog)
319    {
320       if(backlog < 0)
321       {
322          this.backlog = BACKLOG_DEFAULT;
323       }
324       else
325       {
326          this.backlog = backlog;
327       }
328    }
329
330
331    public void run()
332    {
333       while(running)
334       {
335          try
336          {
337             Socket JavaDoc socket = serverSocket.accept();
338             if(trace)
339             {
340                log.trace("Accepted: " + socket);
341             }
342             ServerThread thread = null;
343             boolean newThread = false;
344
345             while(thread == null)
346             {
347                synchronized(threadpool)
348                {
349                   if(threadpool.size() > 0)
350                   {
351                      thread = (ServerThread) threadpool.removeFirst();
352                   }
353                }
354                if(thread == null)
355                {
356                   synchronized(clientpool)
357                   {
358                      if(clientpool.size() < maxPoolSize)
359                      {
360                         thread = new ServerThread(socket, this, clientpool, threadpool, timeout, serverSocketClass);
361                         newThread = true;
362                      }
363                      if(thread == null)
364                      {
365                         clientpool.evict();
366                         if(trace)
367                         {
368                            log.trace("Waiting for a thread...");
369                         }
370                         clientpool.wait();
371                         if(trace)
372                         {
373                            log.trace("Notified of available thread");
374                         }
375                      }
376                   }
377                }
378             }
379             synchronized(clientpool)
380             {
381                clientpool.insert(thread, thread);
382             }
383
384             if(newThread)
385             {
386                if(trace)
387                {
388                   log.trace("Created a new thread, t=" + thread);
389                }
390                thread.start();
391             }
392             else
393             {
394                if(trace)
395                {
396                   log.trace("Reusing thread t=" + thread);
397                }
398                thread.wakeup(socket, timeout);
399             }
400          }
401          catch(Throwable JavaDoc ex)
402          {
403             if(running)
404             {
405                log.error("Failed to accept socket connection", ex);
406             }
407          }
408       }
409    }
410
411    /**
412     * returns true if the transport is bi-directional in nature, for example,
413     * SOAP in unidirectional and SOCKETs are bi-directional (unless behind a firewall
414     * for example).
415     *
416     * @return
417     */

418    public boolean isTransportBiDirectional()
419    {
420       return true;
421    }
422
423    /**
424     * Each implementation of the remote client invoker should have
425     * a default data type that is uses in the case it is not specified
426     * in the invoker locator uri.
427     *
428     * @return
429     */

430    protected String JavaDoc getDefaultDataType()
431    {
432       return SerializableMarshaller.DATATYPE;
433    }
434
435 }
436
Popular Tags