KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > codehaus > aspectwerkz > connectivity > RemoteProxyServer


1 /**************************************************************************************
2  * Copyright (c) Jonas BonŽr, Alexandre Vasseur. All rights reserved. *
3  * http://aspectwerkz.codehaus.org *
4  * ---------------------------------------------------------------------------------- *
5  * The software in this package is published under the terms of the LGPL license *
6  * a copy of which has been included with this distribution in the license.txt file. *
7  **************************************************************************************/

8 package org.codehaus.aspectwerkz.connectivity;
9
10 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
11 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
12 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
13 import org.codehaus.aspectwerkz.exception.WrappedRuntimeException;
14
15 import java.io.FileInputStream JavaDoc;
16 import java.io.IOException JavaDoc;
17 import java.net.InetAddress JavaDoc;
18 import java.net.ServerSocket JavaDoc;
19 import java.net.Socket JavaDoc;
20 import java.util.Properties JavaDoc;
21
22 /**
23  * Server that listens to a specified port for client requests. <p/>The implementation is based on sockets. <p/>The
24  * invoker spawns a specified number of listener threads in which each one of these spawns a new RemoteProxyServerThread
25  * for each client request that comes in. <p/>Uses a thread pool from util.concurrent.
26  *
27  * @author <a HREF="mailto:jboner@codehaus.org">Jonas BonŽr </a>
28  */

29 public class RemoteProxyServer implements Runnable JavaDoc {
30     private static String JavaDoc HOST_NAME;
31
32     private static int PORT;
33
34     private static boolean BOUNDED_THREAD_POOL;
35
36     private static boolean LISTENER_THREAD_RUN_AS_DAEMON;
37
38     private static int BACKLOG;
39
40     private static int NUM_LISTENER_THREADS;
41
42     private static int LISTENER_THREAD_PRIORITY = Thread.NORM_PRIORITY;
43
44     private static int CLIENT_THREAD_TIMEOUT;
45
46     private static int THREAD_POOL_MAX_SIZE;
47
48     private static int THREAD_POOL_MIN_SIZE;
49
50     private static int THREAD_POOL_INIT_SIZE;
51
52     private static int THREAD_POOL_KEEP_ALIVE_TIME;
53
54     private static boolean THREAD_POOL_WAIT_WHEN_BLOCKED;
55
56     /**
57      * Initalize the server properties.
58      */

59     static {
60         Properties JavaDoc properties = new Properties JavaDoc();
61         try {
62             properties.load(new FileInputStream JavaDoc(System.getProperty("aspectwerkz.resource.bundle")));
63         } catch (Exception JavaDoc e) {
64             System.out.println("no aspectwerkz resource bundle found on classpath, using defaults");
65
66             // ignore, use defaults
67
}
68         String JavaDoc property = properties.getProperty("remote.server.hostname");
69         if (property == null) {
70             HOST_NAME = property;
71         } else {
72             HOST_NAME = property;
73         }
74         property = properties.getProperty("remote.server.port");
75         if (property == null) {
76             PORT = 7777;
77         } else {
78             PORT = Integer.parseInt(property);
79         }
80         property = properties.getProperty("remote.server.listener.threads.backlog");
81         if (property == null) {
82             BACKLOG = 200;
83         } else {
84             BACKLOG = Integer.parseInt(property);
85         }
86         property = properties.getProperty("remote.server.listener.threads.nr");
87         if (property == null) {
88             NUM_LISTENER_THREADS = 10;
89         } else {
90             NUM_LISTENER_THREADS = Integer.parseInt(property);
91         }
92         property = properties.getProperty("remote.server.client.threads.timeout");
93         if (property == null) {
94             CLIENT_THREAD_TIMEOUT = 60000;
95         } else {
96             CLIENT_THREAD_TIMEOUT = Integer.parseInt(property);
97         }
98         property = properties.getProperty("remote.server.thread.pool.max.size");
99         if (property == null) {
100             THREAD_POOL_MAX_SIZE = 100;
101         } else {
102             THREAD_POOL_MAX_SIZE = Integer.parseInt(property);
103         }
104         property = properties.getProperty("remote.server.thread.pool.min.size");
105         if (property == null) {
106             THREAD_POOL_MIN_SIZE = 10;
107         } else {
108             THREAD_POOL_MIN_SIZE = Integer.parseInt(property);
109         }
110         property = properties.getProperty("remote.server.thread.pool.init.size");
111         if (property == null) {
112             THREAD_POOL_INIT_SIZE = 10;
113         } else {
114             THREAD_POOL_INIT_SIZE = Integer.parseInt(property);
115         }
116         property = properties.getProperty("remote.server.thread.pool.keep.alive.time");
117         if (property == null) {
118             THREAD_POOL_KEEP_ALIVE_TIME = 300000;
119         } else {
120             THREAD_POOL_KEEP_ALIVE_TIME = Integer.parseInt(property);
121         }
122         property = properties.getProperty("remote.server.thread.pool.type");
123         if ((property != null) && property.equals("dynamic")) {
124             BOUNDED_THREAD_POOL = false;
125         } else {
126             BOUNDED_THREAD_POOL = true;
127         }
128         property = properties.getProperty("remote.server.listener.threads.run.as.daemon");
129         if ((property != null) && property.equals("true")) {
130             LISTENER_THREAD_RUN_AS_DAEMON = true;
131         } else {
132             LISTENER_THREAD_RUN_AS_DAEMON = false;
133         }
134         property = properties.getProperty("remote.server.thread.pool.wait.when.blocked");
135         if ((property != null) && property.equals("true")) {
136             THREAD_POOL_WAIT_WHEN_BLOCKED = true;
137         } else {
138             THREAD_POOL_WAIT_WHEN_BLOCKED = false;
139         }
140     }
141
142     /**
143      * The server socket.
144      */

145     private ServerSocket JavaDoc m_serverSocket = null;
146
147     /**
148      * All listener threads.
149      */

150     private Thread JavaDoc[] m_listenerThreads = null;
151
152     /**
153      * The thread pool.
154      */

155     private PooledExecutor m_threadPool = null;
156
157     /**
158      * The class loader to use.
159      */

160     private ClassLoader JavaDoc m_loader = null;
161
162     /**
163      * The invoker instance.
164      */

165     private Invoker m_invoker = null;
166
167     /**
168      * Marks the server as running.
169      */

170     private boolean m_running = true;
171
172     /**
173      * Starts a server object and starts listening for client access.
174      *
175      * @param loader the classloader to use
176      * @param invoker the invoker that makes the method invocation in the client thread
177      */

178     public RemoteProxyServer(final ClassLoader JavaDoc loader, final Invoker invoker) {
179         m_invoker = invoker;
180         m_loader = loader;
181     }
182
183     /**
184      * Starts up the proxy server.
185      */

186     public void start() {
187         m_running = true;
188         try {
189             InetAddress JavaDoc bindAddress = InetAddress.getByName(HOST_NAME);
190             m_serverSocket = new ServerSocket JavaDoc(PORT, BACKLOG, bindAddress);
191             if (BOUNDED_THREAD_POOL) {
192                 createBoundedThreadPool(
193                         THREAD_POOL_MAX_SIZE,
194                         THREAD_POOL_MIN_SIZE,
195                         THREAD_POOL_INIT_SIZE,
196                         THREAD_POOL_KEEP_ALIVE_TIME,
197                         THREAD_POOL_WAIT_WHEN_BLOCKED
198                 );
199             } else {
200                 createDynamicThreadPool(THREAD_POOL_MIN_SIZE, THREAD_POOL_INIT_SIZE, THREAD_POOL_KEEP_ALIVE_TIME);
201             }
202             m_listenerThreads = new Thread JavaDoc[NUM_LISTENER_THREADS];
203             for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
204                 m_listenerThreads[i] = new Thread JavaDoc(this);
205                 m_listenerThreads[i].setName("AspectWerkz::Listener " + (i + 1));
206                 m_listenerThreads[i].setDaemon(LISTENER_THREAD_RUN_AS_DAEMON);
207                 m_listenerThreads[i].setPriority(LISTENER_THREAD_PRIORITY);
208                 m_listenerThreads[i].start();
209             }
210         } catch (IOException JavaDoc e) {
211             throw new WrappedRuntimeException(e);
212         }
213     }
214
215     /**
216      * Stops the socket proxy server.
217      */

218     public void stop() {
219         m_running = false;
220         for (int i = 0; i < NUM_LISTENER_THREADS; i++) {
221             m_listenerThreads[i].interrupt();
222         }
223         m_threadPool.shutdownNow();
224     }
225
226     /**
227      * Does the actual work of listening for a client request and spawns a new RemoteProxyServerThread to serve the
228      * client.
229      */

230     public void run() {
231         try {
232             while (m_running) {
233                 final Socket JavaDoc clientSocket = m_serverSocket.accept();
234                 synchronized (m_threadPool) {
235                     m_threadPool.execute(
236                             new RemoteProxyServerThread(
237                                     clientSocket,
238                                     m_loader,
239                                     m_invoker,
240                                     CLIENT_THREAD_TIMEOUT
241                             )
242                     );
243                 }
244             }
245             m_serverSocket.close();
246         } catch (Exception JavaDoc e) {
247             throw new WrappedRuntimeException(e);
248         }
249     }
250
251     /**
252      * Creates a new bounded thread pool.
253      *
254      * @param threadPoolMaxSize
255      * @param threadPoolMinSize
256      * @param threadPoolInitSize
257      * @param keepAliveTime
258      * @param waitWhenBlocked
259      */

260     private void createBoundedThreadPool(final int threadPoolMaxSize,
261                                          final int threadPoolMinSize,
262                                          final int threadPoolInitSize,
263                                          final int keepAliveTime,
264                                          final boolean waitWhenBlocked) {
265         m_threadPool = new PooledExecutor(new BoundedBuffer(threadPoolInitSize), threadPoolMaxSize);
266         m_threadPool.setKeepAliveTime(keepAliveTime);
267         m_threadPool.createThreads(threadPoolInitSize);
268         m_threadPool.setMinimumPoolSize(threadPoolMinSize);
269         if (waitWhenBlocked) {
270             m_threadPool.waitWhenBlocked();
271         }
272     }
273
274     /**
275      * Creates a new dynamic thread pool
276      *
277      * @param threadPoolMinSize
278      * @param threadPoolInitSize
279      * @param keepAliveTime
280      */

281     private void createDynamicThreadPool(final int threadPoolMinSize,
282                                          final int threadPoolInitSize,
283                                          final int keepAliveTime) {
284         m_threadPool = new PooledExecutor(new LinkedQueue());
285         m_threadPool.setKeepAliveTime(keepAliveTime);
286         m_threadPool.createThreads(threadPoolInitSize);
287         m_threadPool.setMinimumPoolSize(threadPoolMinSize);
288     }
289 }
Popular Tags