KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > remoting > transport > socket > ServerThread


1 /***************************************
2  * *
3  * JBoss: The OpenSource J2EE WebOS *
4  * *
5  * Distributable under LGPL license. *
6  * See terms of license at gnu.org. *
7  * *
8  ***************************************/

9
10 package org.jboss.remoting.transport.socket;
11
12 import java.io.InputStream JavaDoc;
13 import java.io.InterruptedIOException JavaDoc;
14 import java.io.OutputStream JavaDoc;
15 import java.lang.reflect.Constructor JavaDoc;
16 import java.net.Socket JavaDoc;
17 import java.util.LinkedList JavaDoc;
18 import org.jboss.logging.Logger;
19 import org.jboss.remoting.marshal.MarshalFactory;
20 import org.jboss.remoting.marshal.Marshaller;
21 import org.jboss.remoting.marshal.UnMarshaller;
22
23 /**
24  * This Thread object hold a single Socket connection to a client
25  * and is kept alive until a timeout happens, or it is aged out of the
26  * SocketServerInvoker's LRU cache.
27  * <p/>
28  * There is also a separate thread pool that is used if the client disconnects.
29  * This thread/object is re-used in that scenario and that scenario only.
30  * <p/>
31  * This is a customization of the same ServerThread class used witht the PookedInvoker.
32  * The custimization was made to allow for remoting marshaller/unmarshaller.
33  *
34  * @author <a HREF="mailto:bill@jboss.org">Bill Burke</a>
35  * @author <a HREF="mailto:tom@jboss.org">Tom Elrod</a>
36  * @version $Revision: 1.4 $
37  */

38 public class ServerThread extends Thread JavaDoc
39 {
40    final static private Logger log = Logger.getLogger(ServerThread.class);
41
42    protected SocketServerInvoker invoker;
43    protected LRUPool clientpool;
44    protected LinkedList JavaDoc threadpool;
45    protected volatile boolean running = true;
46    protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
47
protected volatile boolean shutdown = false;
48    protected static int id = 0;
49
50    private SocketWrapper socketWrapper = null;
51    protected String JavaDoc serverSocketClass = ServerSocketWrapper.class.getName();
52    private Constructor JavaDoc serverSocketConstructor = null;
53
54
55    public static synchronized int nextID()
56    {
57       int nextID = id++;
58       return nextID;
59    }
60
61    public ServerThread(Socket JavaDoc socket, SocketServerInvoker invoker, LRUPool clientpool,
62                        LinkedList JavaDoc threadpool, int timeout, String JavaDoc serverSocketClass) throws Exception JavaDoc
63    {
64       super("SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID());
65       this.serverSocketClass = serverSocketClass;
66       this.socketWrapper = createServerSocket(socket, timeout);
67       this.invoker = invoker;
68       this.clientpool = clientpool;
69       this.threadpool = threadpool;
70    }
71
72    public void shutdown()
73    {
74       shutdown = true;
75       running = false;
76       // This is a race and there is a chance
77
// that a invocation is going on at the time
78
// of the interrupt. But I see no way right
79
// now to protect for this.
80

81       // NOTE ALSO!:
82
// Shutdown should never be synchronized.
83
// We don't want to hold up accept() thread! (via LRUpool)
84
if(!handlingResponse)
85       {
86          try
87          {
88             this.interrupt();
89             Thread.interrupted(); // clear
90
}
91          catch(Exception JavaDoc ignored)
92          {
93          }
94       }
95
96    }
97
98    private SocketWrapper createServerSocket(Socket JavaDoc socket, int timeout) throws Exception JavaDoc
99    {
100       if(serverSocketConstructor == null)
101       {
102          //ClassLoader classLoader = invoker.getClassLoader();
103
ClassLoader JavaDoc classLoader = null;
104          if(classLoader == null)
105          {
106             classLoader = Thread.currentThread().getContextClassLoader();
107
108             if(classLoader == null)
109             {
110                classLoader = getClass().getClassLoader();
111             }
112          }
113          Class JavaDoc cl = classLoader.loadClass(serverSocketClass);
114
115          serverSocketConstructor = cl.getConstructor(new Class JavaDoc[]{Socket JavaDoc.class});
116       }
117       SocketWrapper serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object JavaDoc[]{socket});
118       serverSocketWrapper.setTimeout(timeout);
119
120       return serverSocketWrapper;
121    }
122
123
124    public void evict()
125    {
126       running = false;
127       // This is a race and there is a chance
128
// that a invocation is going on at the time
129
// of the interrupt. But I see no way right
130
// now to protect for this.
131
// There may not be a problem because interrupt only effects
132
// threads blocking on IO.
133

134
135       // NOTE ALSO!:
136
// Shutdown should never be synchronized.
137
// We don't want to hold up accept() thread! (via LRUpool)
138
if(!handlingResponse)
139       {
140          try
141          {
142             this.interrupt();
143             Thread.interrupted(); // clear
144
}
145          catch(Exception JavaDoc ignored)
146          {
147          }
148       }
149    }
150
151
152    public synchronized void wakeup(Socket JavaDoc socket, int timeout) throws Exception JavaDoc
153    {
154       this.socketWrapper = createServerSocket(socket, timeout);
155       String JavaDoc name = "SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID();
156       super.setName(name);
157       running = true;
158       handlingResponse = true;
159       this.notify();
160    }
161
162    public void run()
163    {
164       try
165       {
166          while(true)
167          {
168             dorun();
169             if(shutdown)
170             {
171                synchronized(clientpool)
172                {
173                   clientpool.remove(this);
174                }
175                return; // exit thread
176
}
177             else
178             {
179                synchronized(this)
180                {
181                   synchronized(clientpool)
182                   {
183                      synchronized(threadpool)
184                      {
185                         clientpool.remove(this);
186                         threadpool.add(this);
187                         Thread.interrupted(); // clear any interruption so that we can be pooled.
188
clientpool.notify();
189                      }
190                   }
191                   log.debug("begin thread wait");
192                   this.wait();
193                   log.debug("WAKEUP in SERVER THREAD");
194                }
195             }
196          }
197       }
198       catch(Exception JavaDoc ignored)
199       {
200          log.debug("Exiting run on exception", ignored);
201       }
202    }
203
204    protected void acknowledge() throws Exception JavaDoc
205    {
206       // HERE IS THE RACE between ACK received and handlingResponse = true
207
// We can't synchronize because readByte blocks and client is expecting
208
// a response and we don't want to hang client.
209
// see shutdown and evict for more details
210
// There may not be a problem because interrupt only effects
211
// threads blocking on IO. and this thread will just continue.
212
handlingResponse = true;
213
214       socketWrapper.checkConnection();
215
216       handlingResponse = false;
217    }
218
219    protected void processInvocation() throws Exception JavaDoc
220    {
221       handlingResponse = true;
222       // Ok, now read invocation and invoke
223

224
225       //TODO: -TME Need better way to get the unmarshaller (via config)
226
UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(invoker.getLocator(), this.getClass().getClassLoader());
227       if(unmarshaller == null)
228       {
229          unmarshaller = MarshalFactory.getUnMarshaller(invoker.getDataType());
230       }
231       Object JavaDoc obj = unmarshaller.read(socketWrapper.getInputStream(), null);
232
233       Object JavaDoc resp = null;
234       try
235       {
236          // Make absolutely sure thread interrupted is cleared.
237
boolean interrupted = Thread.interrupted();
238          // call transport on the subclass, get the result to handback
239
resp = invoker.invoke(obj);
240          /*
241          if(log.isDebugEnabled())
242          {
243             System.err.println("++ returning invocation response : " + resp);
244          }
245          */

246       }
247       catch(Exception JavaDoc ex)
248       {
249          resp = ex;
250       }
251
252       Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
253

254       Marshaller marshaller = MarshalFactory.getMarshaller(invoker.getLocator(), this.getClass().getClassLoader());
255
256       if(marshaller == null)
257       {
258          marshaller = MarshalFactory.getMarshaller(invoker.getDataType());
259       }
260       marshaller.write(resp, socketWrapper.getOutputStream());
261
262       handlingResponse = false;
263    }
264
265    /**
266     * This is needed because Object*Streams leak
267     */

268    protected void dorun()
269    {
270       log.debug("beginning dorun");
271       running = true;
272       handlingResponse = true;
273
274       // Always do first one without an ACK because its not needed
275
try
276       {
277          processInvocation();
278       }
279       catch(Exception JavaDoc ex)
280       {
281          log.debug("failed to process invocation.", ex);
282          running = false;
283       }
284
285       // Re-use loop
286
while(running)
287       {
288          try
289          {
290             acknowledge();
291             processInvocation();
292          }
293          catch(InterruptedIOException JavaDoc e)
294          {
295             log.debug("socket timed out", e);
296             running = false;
297          }
298          catch(InterruptedException JavaDoc e)
299          {
300             log.debug("interrupted", e);
301          }
302          catch(Exception JavaDoc ex)
303          {
304             log.debug("failed", ex);
305             running = false;
306          }
307          // clear any interruption so that thread can be pooled.
308
Thread.interrupted();
309       }
310       // Ok, we've been shutdown. Do appropriate cleanups.
311
try
312       {
313          InputStream JavaDoc in = socketWrapper.getInputStream();
314          if(in != null)
315          {
316             in.close();
317          }
318          OutputStream JavaDoc out = socketWrapper.getOutputStream();
319          if(out != null)
320          {
321             out.close();
322          }
323       }
324       catch(Exception JavaDoc ex)
325       {
326       }
327       try
328       {
329          socketWrapper.close();
330       }
331       catch(Exception JavaDoc ex)
332       {
333          log.error("Failed cleanup", ex);
334       }
335       socketWrapper = null;
336    }
337 }
338
Popular Tags