KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > il > oil2 > OIL2SocketHandler


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.mq.il.oil2;
8
9 import java.io.IOException JavaDoc;
10 import java.io.ObjectInputStream JavaDoc;
11 import java.io.ObjectOutputStream JavaDoc;
12 import java.util.Iterator JavaDoc;
13
14 import org.jboss.logging.Logger;
15
16 import EDU.oswego.cs.dl.util.concurrent.Channel;
17 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
18 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
19 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
20 import EDU.oswego.cs.dl.util.concurrent.Slot;
21 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
22
23 /**
24  * The OIL2 implementation of the ServerIL object
25  *
26  * @author <a HREF="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
27  * @version $Revision: 1$
28  */

29 public final class OIL2SocketHandler implements java.lang.Cloneable JavaDoc, Runnable JavaDoc
30 {
31    final static private Logger log = Logger.getLogger(OIL2SocketHandler.class);
32
33    /**
34     * Messages will be read from the input stream
35     */

36    private ObjectInputStream JavaDoc in;
37
38    /**
39     * Messages will be writen to the output stream
40     */

41    private ObjectOutputStream JavaDoc out;
42
43    /**
44     * Should we be receiving messages??
45     */

46    private boolean running;
47
48    /**
49     * The thread group that the reader thread should join.
50     */

51    private final ThreadGroup JavaDoc partentThreadGroup;
52
53    /**
54     * Reader thread.
55     */

56    private Thread JavaDoc worker;
57
58    /**
59     * Number of OIL2 Worker threads started.
60     */

61    private static int threadNumber = 0;
62
63    /**
64     * Requst create slots to wait for responses,
65     * those slots are stored in this hashmap.
66     *
67     * This field uses copy on write semantics.
68     */

69    volatile ConcurrentHashMap responseSlots = new ConcurrentHashMap();
70
71    /**
72     * The request listner is notified of new requests
73     * and of asyncronous IO errors.
74     */

75    OIL2RequestListner requestListner;
76
77    /**
78     * If the socket handler is currently pumping messages.
79     */

80    private volatile boolean pumpingData = false;
81
82    /**
83     * Pump mutex
84     */

85    private Object JavaDoc pumpMutex = new Object JavaDoc();
86
87    /**
88     * The that new request get placed into when they arrived.
89     */

90    LinkedQueue requestQueue = new LinkedQueue();
91
92    /**
93     * The thread pool used to service incoming requests..
94     */

95    PooledExecutor pool;
96
97    /**
98     * Constructor for the OILServerIL object
99     *
100     * @param a Description of Parameter
101     * @param port Description of Parameter
102     */

103    public OIL2SocketHandler(ObjectInputStream JavaDoc in, ObjectOutputStream JavaDoc out, ThreadGroup JavaDoc partentThreadGroup)
104    {
105       this.in = in;
106       this.out = out;
107       this.partentThreadGroup = partentThreadGroup;
108
109       synchronized (OIL2SocketHandler.class)
110       {
111          if (pool == null)
112          {
113             pool = new PooledExecutor(50);
114             // supply a ThreadFactory to the pool to create daemon threads
115
log.debug("Setting the OIL2SocketHandler's thread factory");
116             pool.setThreadFactory(
117                new ThreadFactory()
118                {
119                   private int threadNo = 0;
120                   public Thread JavaDoc newThread(Runnable JavaDoc r)
121                   {
122                      Thread JavaDoc t = new Thread JavaDoc(OIL2SocketHandler.this.partentThreadGroup, r, "OIL2SocketHandler Thread-" + threadNo++);
123                      t.setDaemon(true);
124                      return t;
125                   }
126                }
127             );
128             pool.setMinimumPoolSize(1);
129             pool.setKeepAliveTime(1000 * 60);
130             pool.runWhenBlocked();
131             pool.createThreads(1);
132          }
133       }
134    }
135
136    /**
137     * #Description of the Method
138     *
139     * @return Description of the Returned Value
140     * @exception Exception Description of Exception
141     */

142    public void sendRequest(OIL2Request request) throws IOException JavaDoc
143    {
144 // if (log.isTraceEnabled())
145
// log.trace("Sending request: " + request);
146

147       try
148       {
149          synchronized (out)
150          {
151             out.writeByte(1);
152             request.writeExternal(out);
153             out.reset();
154             out.flush();
155          }
156       }
157       catch (IOException JavaDoc e)
158       {
159          throw e;
160       }
161
162    }
163
164    /**
165     * #Description of the Method
166     */

167    private void registerResponseSlot(OIL2Request request, Slot responseSlot) throws IOException JavaDoc
168    {
169       responseSlots.put(request.requestId, responseSlot);
170    }
171
172    /**
173     * #Description of the Method
174     */

175    public void setRequestListner(OIL2RequestListner requestListner)
176    {
177       this.requestListner = requestListner;
178    }
179
180    /**
181     * #Description of the Method
182     *
183     * @return Description of the Returned Value
184     * @exception Exception Description of Exception
185     */

186    public void sendResponse(OIL2Response response) throws IOException JavaDoc
187    {
188 // if (log.isTraceEnabled())
189
// log.trace("Sending response: " + response);
190

191       try
192       {
193          synchronized (out)
194          {
195             out.writeByte(2);
196             response.writeExternal(out);
197             out.reset();
198             out.flush();
199          }
200       }
201       catch (IOException JavaDoc e)
202       {
203          throw e;
204       }
205    }
206
207    /**
208     * Pumps messages from the input stream.
209     *
210     * If the request object is not null, then the target message is
211     * the response object for the request argument. The target
212     * message is returned.
213     *
214     * If the request object is null, then the target message is
215     * the first new request that is encountered. The new request
216     * messag is returned.
217     *
218     * All message received before the target message are pumped.
219     * A pumped message is placed in either Response Slots or
220     * the Request Queue depending on if the message is a response
221     * or requests.
222     *
223     * @param request The request object that is waiting for a response.
224     * @return the request or reponse object that this method was looking for
225     * @exception IOException Description of Exception
226     */

227    private Object JavaDoc pumpMessages(OIL2Request request, Channel mySlot)
228       throws IOException JavaDoc, ClassNotFoundException JavaDoc, InterruptedException JavaDoc
229    {
230
231       synchronized (pumpMutex)
232       {
233          // Is somebody else pumping data??
234
if (pumpingData)
235          {
236             return null;
237          }
238          else
239             pumpingData = true;
240       }
241
242       try
243       {
244          while (true)
245          {
246             if (mySlot != null)
247             {
248                // Do we have our response sitting in our slot allready??
249
Object JavaDoc o;
250                while ((o = mySlot.peek()) != null)
251                {
252                   o = mySlot.take();
253                   if (o != this)
254                   {
255                      return o;
256                   }
257                }
258             }
259
260             byte code = in.readByte();
261             switch (code)
262             {
263                // Request received... pass it up
264
case 1 :
265                   OIL2Request newRequest = new OIL2Request();
266                   newRequest.readExternal(in);
267
268                   // Are we looking for a request??
269
if (request == null)
270                   {
271                      return newRequest;
272                   }
273                   else
274                   {
275                      requestQueue.put(newRequest);
276                   }
277
278                   break;
279
280                   // Response received... find the response slot
281
case 2 :
282
283                   OIL2Response response = new OIL2Response();
284                   response.readExternal(in);
285
286                   // No reponse id to response to..
287
if (response.correlationRequestId == null)
288                      continue;
289
290                   // Is this the response object we are looking for
291
if (request != null && request.requestId.equals(response.correlationRequestId))
292                   {
293                      return response;
294                   }
295                   else
296                   {
297
298                      Slot slot = (Slot) responseSlots.remove(response.correlationRequestId);
299
300                      if (slot != null)
301                      {
302                         slot.put(response);
303                      }
304                      else
305                      {
306                         // This should not happen...
307
if (log.isTraceEnabled())
308                            log.warn("No slot registered for: " + response);
309                      }
310                   }
311                   break;
312             } // switch
313
} // while
314
}
315       finally
316       {
317          synchronized (pumpMutex)
318          {
319             pumpingData = false;
320          }
321
322          Thread JavaDoc thread = Thread.currentThread();
323          boolean interrupted = thread.isInterrupted();
324
325          // We are done, let somebody know that they can
326
// start pumping us again.
327
Iterator JavaDoc i = responseSlots.values().iterator();
328          while (i.hasNext())
329          {
330             Slot s = (Slot) i.next();
331             if (s != mySlot)
332                s.offer(this, 0);
333          }
334
335          // Only notify the request waiter if we are not
336
// giving him a message on this method call.
337
if (request != null)
338          {
339             requestQueue.put(this);
340          }
341
342          if (interrupted)
343             thread.interrupt();
344       }
345    }
346
347    public OIL2Response synchRequest(OIL2Request request)
348       throws IOException JavaDoc, InterruptedException JavaDoc, ClassNotFoundException JavaDoc
349    {
350
351       // if (log.isTraceEnabled())
352
// log.trace("Sending request: "+request);
353

354       Slot slot = new Slot();
355       registerResponseSlot(request, slot);
356       sendRequest(request);
357
358       Object JavaDoc o = null;
359       while (true)
360       {
361          // Do we have something in our queue??
362
if (o != null)
363          {
364             // was is a request message??
365
if (o != this)
366             {
367                // if (log.isTraceEnabled())
368
// log.trace("Got response: "+o);
369
return (OIL2Response) o;
370             }
371             // See if we have another message in the queue.
372
o = slot.peek();
373             if (o != null)
374                o = slot.take();
375          }
376          else
377          {
378             // We did not have any messages in the slot,
379
// so we have to go pumping..
380
o = pumpMessages(request, slot);
381             if (o == null)
382             {
383                // Somebody else is in the pump, wait till we
384
// are notified to get in.
385
o = slot.take();
386             }
387          }
388       } // end while
389
}
390
391    public class RequestRunner implements Runnable JavaDoc
392    {
393       OIL2Request request;
394       RequestRunner(OIL2Request request)
395       {
396          this.request = request;
397       }
398       public void run()
399       {
400          requestListner.handleRequest(request);
401       }
402    }
403
404    /**
405     * Main processing method for the OILClientILService object
406     */

407    public void run()
408    {
409       try
410       {
411
412          Object JavaDoc o = null;
413          while (running)
414          {
415             // Do we have something in our queue??
416
if (o != null)
417             {
418                // was is a request message??
419
if (o != this)
420                {
421                   pool.execute(new RequestRunner((OIL2Request) o));
422                }
423                // See if we have another message in the queue.
424
o = requestQueue.peek();
425                if (o != null)
426                   o = requestQueue.take();
427             }
428             else
429             {
430                // We did not have any messages in the queue,
431
// so we have to go pumping..
432
o = pumpMessages(null, requestQueue);
433                if (o == null)
434                {
435                   // Somebody else is in the pump, wait till we
436
// are notified to get in.
437
o = requestQueue.take();
438                }
439             }
440          } // end while
441

442       }
443       catch (InterruptedException JavaDoc e)
444       {
445          if (log.isTraceEnabled())
446             log.trace("Stopped due to interruption");
447       }
448       catch (Exception JavaDoc e)
449       {
450          if (log.isTraceEnabled())
451             log.trace("Stopping due to unexcpected exception: ", e);
452          requestListner.handleConnectionException(e);
453       }
454
455       // ensure the flag is set correctly
456
running = false;
457       if (log.isTraceEnabled())
458          log.trace("Stopped");
459    }
460
461    public void start() //throws java.lang.Exception
462
{
463       if (log.isTraceEnabled())
464          log.trace("Starting");
465
466       running = true;
467       worker = new Thread JavaDoc(partentThreadGroup, this, "OIL2 Worker-" + threadNumber++);
468       worker.setDaemon(true);
469       worker.start();
470
471    }
472
473    public void stop()
474    {
475       if (log.isTraceEnabled())
476          log.trace("Stopping");
477       running = false;
478       worker.interrupt();
479    }
480
481 }
482
Popular Tags