KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > invocation > pooled > server > ServerThread


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.invocation.pooled.server;
23
24 import java.io.BufferedInputStream JavaDoc;
25 import java.io.BufferedOutputStream JavaDoc;
26 import java.io.InterruptedIOException JavaDoc;
27 import java.io.ObjectInputStream JavaDoc;
28 import java.io.ObjectOutputStream JavaDoc;
29 import java.net.Socket JavaDoc;
30 import java.util.LinkedList JavaDoc;
31
32 import org.jboss.invocation.Invocation;
33 import org.jboss.invocation.pooled.interfaces.OptimizedObjectInputStream;
34 import org.jboss.invocation.pooled.interfaces.OptimizedObjectOutputStream;
35 import org.jboss.logging.Logger;
36
37 /**
38  * This Thread object hold a single Socket connection to a client
39  * and is kept alive until a timeout happens, or it is aged out of the
40  * PooledInvoker's LRU cache.
41  *
42  * There is also a separate thread pool that is used if the client disconnects.
43  * This thread/object is re-used in that scenario and that scenario only.
44  *
45  * This class will demarshal then delegate to PooledInvoker for invocation.
46  *
47  *
48  * *NOTES* ObjectStreams were found to be better performing than the Custom marshalling
49  * done by the TrunkInvoker.
50  *
51  * @author <a HREF="mailto:bill@jboss.org">Bill Burke</a>
52  * @author Scott.Stark@jboss.org
53  * @version $Revision: 40982 $
54  */

55 public class ServerThread extends Thread JavaDoc
56 {
57    final static private Logger log = Logger.getLogger(ServerThread.class);
58
59    protected ObjectInputStream JavaDoc in;
60    protected ObjectOutputStream JavaDoc out;
61    protected Socket JavaDoc socket;
62    protected PooledInvoker invoker;
63    protected LRUPool clientpool;
64    protected LinkedList JavaDoc threadpool;
65    protected volatile boolean running = true;
66    protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
67
protected volatile boolean shutdown = false;
68    protected boolean trace;
69    protected static int id = 0;
70
71    public static synchronized int nextID()
72    {
73       int nextID = id ++;
74       return nextID;
75    }
76
77    public ServerThread(Socket JavaDoc socket, PooledInvoker invoker, LRUPool clientpool,
78       LinkedList JavaDoc threadpool, int timeout) throws Exception JavaDoc
79    {
80       super("PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID());
81       this.socket = socket;
82       this.invoker = invoker;
83       this.clientpool = clientpool;
84       this.threadpool = threadpool;
85       this.trace = log.isTraceEnabled();
86       socket.setSoTimeout(timeout);
87    }
88
89    public void shutdown()
90    {
91       shutdown = true;
92       running = false;
93       // This is a race and there is a chance
94
// that a invocation is going on at the time
95
// of the interrupt. But I see no way right
96
// now to protect for this.
97

98       // NOTE ALSO!:
99
// Shutdown should never be synchronized.
100
// We don't want to hold up accept() thread! (via LRUpool)
101
if (!handlingResponse)
102       {
103          try
104          {
105             this.interrupt();
106             Thread.interrupted(); // clear
107
}
108          catch (Exception JavaDoc ignored) {}
109       }
110       
111    }
112
113    public void evict()
114    {
115       running = false;
116       // This is a race and there is a chance
117
// that a invocation is going on at the time
118
// of the interrupt. But I see no way right
119
// now to protect for this.
120
// There may not be a problem because interrupt only effects
121
// threads blocking on IO.
122

123
124       // NOTE ALSO!:
125
// Shutdown should never be synchronized.
126
// We don't want to hold up accept() thread! (via LRUpool)
127
if (!handlingResponse)
128       {
129          try
130          {
131             this.interrupt();
132             Thread.interrupted(); // clear
133
}
134          catch (Exception JavaDoc ignored) {}
135       }
136    }
137
138
139    public synchronized void wakeup(Socket JavaDoc socket, int timeout) throws Exception JavaDoc
140    {
141       this.socket = socket;
142       String JavaDoc name = "PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID();
143       super.setName(name);
144       socket.setSoTimeout(timeout);
145       running = true;
146       handlingResponse = true;
147       this.notify();
148    }
149
150    public void run()
151    {
152       try
153       {
154          while (true)
155          {
156             dorun();
157             //System.out.println("finished....");
158
if (shutdown)
159             {
160                //System.out.println("doing shutdown");
161
synchronized (clientpool)
162                {
163                   clientpool.remove(this);
164                }
165                return; // exit thread
166
}
167             else
168             {
169                //System.out.println("save thread");
170
synchronized (this)
171                {
172                   //System.out.println("synch on client pool");
173
synchronized(clientpool)
174                   {
175                      //System.out.println("synch on thread pool");
176
synchronized(threadpool)
177                      {
178                         //System.out.println("removing myself from the pool: " + clientpool.size());
179
clientpool.remove(this);
180                         //System.out.println("adding myself to threadpool");
181
threadpool.add(this);
182                         Thread.interrupted(); // clear any interruption so that we can be pooled.
183
clientpool.notify();
184                      }
185                   }
186                   if( trace )
187                      log.trace("begin thread wait");
188                   this.wait();
189                   if( trace )
190                      log.trace("WAKEUP in SERVER THREAD");
191                }
192             }
193          }
194       }
195       catch (Exception JavaDoc ignored)
196       {
197          if( trace )
198             log.trace("Exiting run on exception", ignored);
199       }
200    }
201
202    protected void acknowledge() throws Exception JavaDoc
203    {
204       //System.out.println("****acknowledge " + Thread.currentThread());
205
// Perform acknowledgement to convince client
206
// that the socket is still active
207
byte ACK = in.readByte();
208       //System.out.println("****acknowledge read byte" + Thread.currentThread());
209

210       // HERE IS THE RACE between ACK received and handlingResponse = true
211
// We can't synchronize because readByte blocks and client is expecting
212
// a response and we don't want to hang client.
213
// see shutdown and evict for more details
214
// There may not be a problem because interrupt only effects
215
// threads blocking on IO. and this thread will just continue.
216
handlingResponse = true;
217       
218       out.writeByte(ACK);
219       out.flush();
220    }
221
222    protected void processInvocation() throws Exception JavaDoc
223    {
224       handlingResponse = true;
225       // Ok, now read invocation and invoke
226
Invocation invocation = (Invocation)in.readObject();
227       in.readObject(); // for stupid ObjectInputStream reset
228
Object JavaDoc response = null;
229       try
230       {
231           // Make absolutely sure thread interrupted is cleared.
232
boolean interrupted = Thread.interrupted();
233          response = invoker.invoke(invocation);
234       }
235       catch (Exception JavaDoc ex)
236       {
237          response = ex;
238       }
239       Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
240
out.writeObject(response);
241       out.reset();
242       // to make sure stream gets reset
243
// Stupid ObjectInputStream holds object graph
244
// can only be set by the client/server sending a TC_RESET
245
out.writeObject(Boolean.TRUE);
246       out.flush();
247       out.reset();
248       handlingResponse = false;
249    }
250
251    /**
252     * This is needed because Object*Streams leak
253     */

254    protected void dorun()
255    {
256       log.debug("beginning dorun");
257       running = true;
258       handlingResponse = true;
259       try
260       {
261          BufferedOutputStream JavaDoc bos = new BufferedOutputStream JavaDoc(socket.getOutputStream());
262          out = new OptimizedObjectOutputStream(bos);
263          out.flush();
264          BufferedInputStream JavaDoc bis = new BufferedInputStream JavaDoc(socket.getInputStream());
265          in = new OptimizedObjectInputStream(bis);
266       }
267       catch (Exception JavaDoc e)
268       {
269          log.error("Failed to initialize", e);
270       }
271
272       // Always do first one without an ACK because its not needed
273
try
274       {
275          processInvocation();
276       }
277       catch (Exception JavaDoc e)
278       {
279          running = false;
280          if( trace )
281             log.trace("invocation failed", e);
282       }
283
284       // Re-use loop
285
while (running)
286       {
287          try
288          {
289             acknowledge();
290             processInvocation();
291          }
292          catch (InterruptedIOException JavaDoc e)
293          {
294             log.debug("socket timed out", e);
295             running = false;
296          }
297          catch (InterruptedException JavaDoc e)
298          {
299             log.debug("interrupted", e);
300          }
301          catch (Exception JavaDoc ex)
302          {
303             if( trace )
304                log.debug("invocation failed", ex);
305             running = false;
306          }
307          // clear any interruption so that thread can be pooled.
308
Thread.interrupted();
309       }
310
311       if( trace )
312          log.trace("finished loop");
313       // Ok, we've been shutdown. Do appropriate cleanups.
314
try
315       {
316          if (in != null) in.close();
317          if (out != null) out.close();
318       }
319       catch (Exception JavaDoc ex)
320       {
321       }
322       try
323       {
324          socket.close();
325       }
326       catch (Exception JavaDoc ex)
327       {
328          log.debug("Failed cleanup", ex);
329       }
330       socket = null;
331       in = null;
332       out = null;
333    }
334 }
335
Popular Tags