KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mq > il > uil2 > SocketManager


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.mq.il.uil2;
23
24 import java.io.IOException JavaDoc;
25 import java.io.ObjectInputStream JavaDoc;
26 import java.io.ObjectOutputStream JavaDoc;
27 import java.net.InetAddress JavaDoc;
28 import java.net.Socket JavaDoc;
29 import java.util.Iterator JavaDoc;
30
31 import javax.jms.JMSException JavaDoc;
32
33 import org.jboss.logging.Logger;
34 import org.jboss.mq.il.uil2.msgs.BaseMsg;
35 import org.jboss.util.stream.NotifyingBufferedInputStream;
36 import org.jboss.util.stream.NotifyingBufferedOutputStream;
37
38 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
39 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
40 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
41 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
43 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
44
45 /** Used to manage the client/server and server/client communication in an
46  * asynchrounous manner.
47  *
48  * @todo verify the pooled executor config
49  *
50  * @author Scott.Stark@jboss.org
51  * @version $Revision: 45305 $
52  */

53 public class SocketManager
54 {
55    private static Logger log = Logger.getLogger(SocketManager.class);
56
57    private static final int STOPPED = 0;
58    private static final int STARTED = 1;
59    private static final int STOPPING = 2;
60    private static SynchronizedInt taskID = new SynchronizedInt(0);
61
62    /** The socket created by the IL layer */
63    private Socket JavaDoc socket;
64    /** The input stream used by the read task */
65    private ObjectInputStream JavaDoc in;
66    /** The buffering for output */
67    NotifyingBufferedInputStream bufferedInput;
68    /** The output stream used by the write task */
69    private ObjectOutputStream JavaDoc out;
70    /** The buffering for output */
71    NotifyingBufferedOutputStream bufferedOutput;
72    /** The write task thread */
73    private Thread JavaDoc writeThread;
74    /** The read task thread */
75    private Thread JavaDoc readThread;
76    /** The thread pool used to service incoming requests */
77    PooledExecutor pool;
78    /** The flag used to control the read loop */
79    private int readState = STOPPED;
80    /** The flag used to control the write loop */
81    private int writeState = STOPPED;
82    /** Used for constrolling the state */
83    private SynchronizedBoolean running = new SynchronizedBoolean(false);
84    /** The queue of messages to be processed by the write task */
85    private LinkedQueue sendQueue;
86    /** A HashMap<Integer, BaseMsg> that are awaiting a reply */
87    private ConcurrentHashMap replyMap;
88    /** The callback handler used for msgs that are not replys */
89    private SocketManagerHandler handler;
90    /** The buffer size */
91    private int bufferSize = 1;
92    /** The chunk size for notification of stream activity */
93    private int chunkSize = 0x40000000;
94    /** The logging trace level which is set in the ctor */
95    private boolean trace;
96
97    public SocketManager(Socket JavaDoc s) throws IOException JavaDoc
98    {
99       socket = s;
100       sendQueue = new LinkedQueue();
101       replyMap = new ConcurrentHashMap();
102       trace = log.isTraceEnabled();
103    }
104
105    /** Start the read and write threads using the given thread group and
106     * names of "UIL2.SocketManager.ReadTask" and "UIL2.SocketManager.WriteTask".
107     * @param tg the thread group to use for the read and write threads.
108     */

109    public void start(ThreadGroup JavaDoc tg)
110    {
111       if (trace)
112          log.trace("start called", new Exception JavaDoc("Start stack trace"));
113
114       InetAddress JavaDoc inetAddr = socket.getInetAddress();
115       String JavaDoc ipAddress = (inetAddr != null) ? inetAddr.getHostAddress() : "<unknown>";
116       ipAddress += ":" + socket.getPort();
117       if (pool == null)
118       {
119          // TODO: Check the validity of this config
120
pool = new PooledExecutor(5);
121          pool.setMinimumPoolSize(1);
122          pool.setKeepAliveTime(1000 * 60);
123          pool.runWhenBlocked();
124          String JavaDoc id = "SocketManager.MsgPool@"+
125             Integer.toHexString(System.identityHashCode(this))
126             + " client=" + ipAddress;
127          pool.setThreadFactory(new UILThreadFactory(id));
128       }
129
130       ReadTask readTask = new ReadTask();
131       readThread = new Thread JavaDoc(tg, readTask, "UIL2.SocketManager.ReadTask#" + taskID.increment() + " client=" + ipAddress);
132       readThread.setDaemon(true);
133
134       WriteTask writeTask = new WriteTask();
135       writeThread = new Thread JavaDoc(tg, writeTask, "UIL2.SocketManager.WriteTask#" + taskID.increment() + " client=" + ipAddress);
136       writeThread.setDaemon(true);
137       
138       synchronized (running)
139       {
140          readState = STARTED;
141          writeState = STARTED;
142          running.set(true);
143       }
144
145       readThread.start();
146       writeThread.start();
147    }
148
149    /** Stop the read and write threads by interrupting them.
150     */

151    public void stop()
152    {
153       synchronized (running)
154       {
155          if (readState == STARTED)
156          {
157             readState = STOPPING;
158             readThread.interrupt();
159          }
160          if (writeState == STARTED)
161          {
162             writeState = STOPPING;
163             writeThread.interrupt();
164          }
165          running.set(false);
166          if (pool != null)
167          {
168             pool.shutdownNow();
169             pool = null;
170          }
171       }
172    }
173
174    /** Set the callback handler for msgs that were not originated by the
175     * socket manager. This is any msgs read that was not sent via the
176     * sendMessage method.
177     *
178     * @param handler
179     */

180    public void setHandler(SocketManagerHandler handler)
181    {
182       this.handler = handler;
183       if (bufferedInput != null)
184          bufferedInput.setStreamListener(handler);
185       if (bufferedOutput != null)
186          bufferedOutput.setStreamListener(handler);
187    }
188
189    /**
190     * Sets the buffer size
191     *
192     * @param size the size of the buffer
193     */

194    public void setBufferSize(int size)
195    {
196       this.bufferSize = size;
197    }
198
199    /**
200     * Sets the chunk size
201     *
202     * @param size the size of a chunk
203     */

204    public void setChunkSize(int size)
205    {
206       this.chunkSize = size;
207    }
208
209    /** Send a two-way message and block the calling thread until the
210     * msg reply is received. This enques the msg to the sendQueue, places
211     * the msg in the replyMap and waits on the msg. The msg is notified by the
212     * read task thread when it finds a msg with a msgID that maps to the
213     * msg in the msgReply map.
214     *
215     * @param msg the request msg to send
216     * @throws Exception thrown if the reply message has an error value
217     */

218    public void sendMessage(BaseMsg msg) throws Exception JavaDoc
219    {
220       internalSendMessage(msg, true);
221       if (msg.error != null)
222       {
223          if (trace)
224             log.trace("sendMessage will throw error", msg.error);
225          throw msg.error;
226       }
227    }
228
229    /**
230     * Send a reply.
231     *
232     * @param msg the message
233     * @throws Exception for any error
234     */

235    public void sendReply(BaseMsg msg) throws Exception JavaDoc
236    {
237       msg.trimReply();
238       internalSendMessage(msg, false);
239    }
240
241    /**
242     * Send a one-way.
243     *
244     * @param msg the message
245     * @throws Exception for any error
246     */

247    public void sendOneWay(BaseMsg msg) throws Exception JavaDoc
248    {
249       msg.getMsgID();
250       internalSendMessage(msg, false);
251    }
252
253    /** This places the msg into the sendQueue and returns if waitOnReply
254     * is false, or enques the msg to the sendQueue, places the msg
255     * in the replyMap and waits on the msg.
256     *
257     * @param msg
258     * @param waitOnReply
259     * @throws Exception
260     */

261    private void internalSendMessage(BaseMsg msg, boolean waitOnReply) throws Exception JavaDoc
262    {
263       if (running.get() == false)
264          throw new IOException JavaDoc("Client is not connected");
265
266       if (waitOnReply)
267       { // Send a request msg and wait for the reply
268
synchronized (msg)
269          {
270             // Create the request msgID
271
msg.getMsgID();
272             if (trace)
273                log.trace("Begin internalSendMessage, round-trip msg=" + msg);
274             // Place the msg into the write queue and reply map
275
replyMap.put(msg, msg);
276             sendQueue.put(msg);
277             // Wait for the msg reply
278
msg.wait();
279          }
280       }
281       else
282       { // Send an asynchronous msg, typically a reply
283
if (trace)
284             log.trace("Begin internalSendMessage, one-way msg=" + msg);
285          sendQueue.put(msg);
286       }
287       if (trace)
288          log.trace("End internalSendMessage, msg=" + msg);
289    }
290
291    /** The task managing the socket read thread
292     *
293     */

294    public class ReadTask implements Runnable JavaDoc
295    {
296       public void run()
297       {
298          int msgType = 0;
299          log.debug("Begin ReadTask.run");
300          try
301          {
302             bufferedInput = new NotifyingBufferedInputStream(socket.getInputStream(), bufferSize, chunkSize, handler);
303             in = new ObjectInputStream JavaDoc(bufferedInput);
304             log.debug("Created ObjectInputStream");
305          }
306          catch (IOException JavaDoc e)
307          {
308             handleStop("Failed to create ObjectInputStream", e);
309             return;
310          }
311
312          while (true)
313          {
314             try
315             {
316                msgType = in.readByte();
317                int msgID = in.readInt();
318                if (trace)
319                   log.trace("Read msgType: " + BaseMsg.toString(msgType) + ", msgID: " + msgID);
320                // See if there is a msg awaiting a reply
321
BaseMsg key = new BaseMsg(msgType, msgID);
322                BaseMsg msg = (BaseMsg) replyMap.remove(key);
323                if (msg == null)
324                {
325                   msg = BaseMsg.createMsg(msgType);
326                   msg.setMsgID(msgID);
327                   msg.read(in);
328                   if (trace)
329                      log.trace("Read new msg: " + msg);
330
331                   // Handle the message
332
if (pool == null)
333                      break;
334                   msg.setHandler(this);
335                   pool.execute(msg);
336                }
337                else
338                {
339                   if (trace)
340                      log.trace("Found replyMap msg: " + msg);
341                   msg.setMsgID(msgID);
342                   try
343                   {
344                      msg.read(in);
345                      if (trace)
346                         log.trace("Read msg reply: " + msg);
347                   }
348                   catch (Throwable JavaDoc e)
349                   {
350                      // Forward the error to the waiting message
351
msg.setError(e);
352                      throw e;
353                   }
354                   // Always notify the waiting message
355
finally
356                   {
357                      synchronized (msg)
358                      {
359                         msg.notify();
360                      }
361                   }
362                }
363             }
364             catch (ClassNotFoundException JavaDoc e)
365             {
366                handleStop("Failed to read msgType:" + msgType, e);
367                break;
368             }
369             catch (IOException JavaDoc e)
370             {
371                handleStop("Exiting on IOE", e);
372                break;
373             }
374             catch (InterruptedException JavaDoc e)
375             {
376                handleStop("Exiting on interrupt", e);
377                break;
378             }
379             catch (Throwable JavaDoc e)
380             {
381                handleStop("Exiting on unexpected error in read task", e);
382                break;
383             }
384          }
385          log.debug("End ReadTask.run");
386       }
387
388       /**
389        * Handle the message or respond with an error
390        */

391       public void handleMsg(BaseMsg msg)
392       {
393          try
394          {
395             handler.handleMsg(msg);
396          }
397          catch (Throwable JavaDoc e)
398          {
399             if (e instanceof JMSException JavaDoc)
400                log.trace("Failed to handle: " + msg.toString(), e);
401             else if (e instanceof RuntimeException JavaDoc || e instanceof Error JavaDoc)
402                log.error("Failed to handle: " + msg.toString(), e);
403             else
404                log.debug("Failed to handle: " + msg.toString(), e);
405             msg.setError(e);
406             try
407             {
408                internalSendMessage(msg, false);
409             }
410             catch (Exception JavaDoc ie)
411             {
412                log.debug("Failed to send error reply", ie);
413             }
414          }
415       }
416
417       /**
418        * Stop the read thread
419        */

420       private void handleStop(String JavaDoc error, Throwable JavaDoc e)
421       {
422          synchronized (running)
423          {
424             readState = STOPPING;
425             running.set(false);
426          }
427
428          if (e instanceof IOException JavaDoc || e instanceof InterruptedException JavaDoc)
429          {
430             if (trace)
431                log.trace(error, e);
432          }
433          else
434             log.debug(error, e);
435
436          replyAll(e);
437          if (handler != null)
438          {
439             handler.asynchFailure(error, e);
440             handler.close();
441          }
442
443          synchronized (running)
444          {
445             readState = STOPPED;
446             if (writeState == STARTED)
447             {
448                writeState = STOPPING;
449                writeThread.interrupt();
450             }
451          }
452
453          try
454          {
455             in.close();
456          }
457          catch (Exception JavaDoc ignored)
458          {
459             if (trace)
460                log.trace(ignored.getMessage(), ignored);
461          }
462
463          try
464          {
465             socket.close();
466          }
467          catch (Exception JavaDoc ignored)
468          {
469             if (trace)
470                log.trace(ignored.getMessage(), ignored);
471          }
472       }
473
474       private void replyAll(Throwable JavaDoc e)
475       {
476          // Clear the interrupted state of the thread
477
Thread.interrupted();
478
479          for (Iterator JavaDoc iterator = replyMap.keySet().iterator(); iterator.hasNext();)
480          {
481             BaseMsg msg = (BaseMsg) iterator.next();
482             msg.setError(e);
483             synchronized (msg)
484             {
485                msg.notify();
486             }
487             iterator.remove();
488          }
489       }
490    }
491
492    /** The task managing the socket write thread
493     *
494     */

495    public class WriteTask implements Runnable JavaDoc
496    {
497       public void run()
498       {
499          log.debug("Begin WriteTask.run");
500          try
501          {
502             bufferedOutput =
503                new NotifyingBufferedOutputStream(socket.getOutputStream(), bufferSize, chunkSize, handler);
504             out = new ObjectOutputStream JavaDoc(bufferedOutput);
505             log.debug("Created ObjectOutputStream");
506          }
507          catch (IOException JavaDoc e)
508          {
509             handleStop(null, "Failed to create ObjectOutputStream", e);
510             return;
511          }
512
513          while (true)
514          {
515             BaseMsg msg = null;
516             try
517             {
518                msg = (BaseMsg) sendQueue.take();
519                if (trace)
520                   log.trace("Write msg: " + msg);
521                msg.write(out);
522                out.reset();
523                out.flush();
524             }
525             catch (InterruptedException JavaDoc e)
526             {
527                handleStop(msg, "WriteTask was interrupted", e);
528                break;
529             }
530             catch (IOException JavaDoc e)
531             {
532                handleStop(msg, "Exiting on IOE", e);
533                break;
534             }
535             catch (Throwable JavaDoc e)
536             {
537                handleStop(msg, "Failed to write msgType:" + msg, e);
538                break;
539             }
540          }
541          log.debug("End WriteTask.run");
542       }
543
544       /**
545        * Stop the write thread
546        */

547       private void handleStop(BaseMsg msg, String JavaDoc error, Throwable JavaDoc e)
548       {
549          synchronized (running)
550          {
551             writeState = STOPPING;
552             running.set(false);
553          }
554
555          if (e instanceof InterruptedException JavaDoc || e instanceof IOException JavaDoc)
556          {
557             if (trace)
558                log.trace(error, e);
559          }
560          else
561             log.debug(error, e);
562
563          if (msg != null)
564          {
565             msg.setError(e);
566             synchronized (msg)
567             {
568                msg.notify();
569             }
570          }
571
572          synchronized (running)
573          {
574             writeState = STOPPED;
575             if (readState == STARTED)
576             {
577                readState = STOPPING;
578                readThread.interrupt();
579             }
580          }
581
582          try
583          {
584             out.close();
585          }
586          catch (Exception JavaDoc ignored)
587          {
588             if (trace)
589                log.trace(ignored.getMessage(), ignored);
590          }
591
592          try
593          {
594             socket.close();
595          }
596          catch (Exception JavaDoc ignored)
597          {
598             if (trace)
599                log.trace(ignored.getMessage(), ignored);
600          }
601       }
602    }
603
604    static class UILThreadFactory implements ThreadFactory
605    {
606       private String JavaDoc id;
607       private int count;
608       
609       UILThreadFactory(String JavaDoc id)
610       {
611          this.id = id;
612       }
613       public Thread JavaDoc newThread(Runnable JavaDoc command)
614       {
615          synchronized( this )
616          {
617             count ++;
618          }
619          Thread JavaDoc t = new Thread JavaDoc(command, "UIL2("+id+")#"+count);
620          return t;
621       }
622    }
623 }
624
Popular Tags